flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rico Bergmann <i...@ricobergmann.de>
Subject Re: Flink to ingest from Kafka to HDFS?
Date Thu, 20 Aug 2015 09:58:05 GMT
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise
the output would never be written to the destination if checkpointing is disabled). Instead
I would keep the offsets of written and Checkpointed records. When recovering you would then
somehow delete or overwrite the records after that offset. (But I don't really know whether
this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning
function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each
queue has an output format assigned to it. On flushing the output file is opened, the content
of the queue is written to it, and then closed.

Does this sound reasonable?

> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <aljoscha@apache.org>:
> Yes, this seems like a good approach. We should probably no reuse the KeySelector for
this but maybe a more use-case specific type of function that can create a desired filename
from an input object.
> This is only the first part, though. The hard bit would be implementing rolling files
and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing
you could maybe use "staging-files": all elements are put into a staging file. And then, when
the notification about a completed checkpoint is received the contents of this file would
me moved (or appended) to the actual destination.
> Do you have any Ideas about the rolling files/checkpointing?
>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <info@ricobergmann.de> wrote:
>> I'm thinking about implementing this. 
>> After looking into the flink code I would basically subclass FileOutputFormat in
let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the
file system is then appended by the string, the KeySelector returns. 
>> U think this is a good approach?
>> Greets. Rico. 
>>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <sewen@apache.org>:
>>> If you are up for it, this would be a very nice addition to Flink, a great contribution
>>>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>> Hi!
>>>> This should definitely be possible in Flink. Pretty much exactly like you
describe it.
>>>> You need a custom version of the HDFS sink with some logic when to roll over
to a new file.
>>>> You can also make the sink "exactly once" by integrating it with the checkpointing.
For that, you would probably need to keep the current path and output stream offsets as of
the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates.
If that is not possible, you would probably buffer records between checkpoints and only write
on checkpoints.
>>>> Greetings,
>>>> Stephan
>>>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hpzorn@gmail.com>
>>>>> Hi,
>>>>> Did anybody think of (mis-) using Flink streaming as an alternative to
Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing
that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this
a good idea to do? 
>>>>> Flume basically is about consuming data from somewhere, peeking into
each record and then directing it to a specific directory/file in HDFS reliably. I've seen
there is a FlumeSink, but would it be possible to get the same functionality with
>>>>> Flink alone?
>>>>> I've skimmed through the documentation and found the option to split
the output by key and the possibility to add multiple sinks. As I understand, Flink programs
are generally static, so it would not be possible to add/remove sinks at runtime?
>>>>> So you would need to implement a custom sink directing the records to
different files based on a key (e.g. date)? Would it be difficult to implement things like
rolling outputs etc? Or better just use Flume?
>>>>> Best, 
>>>>> Hans-Peter

View raw message