flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink to ingest from Kafka to HDFS?
Date Wed, 26 Aug 2015 08:31:33 GMT
BTW: We are currently working on adding a rolling-file HDFS sink to Flink
that will initially work very similar as what flume gives you. If I
understand it correctly, Flume may have duplicates in the output from
incomplete flushes on failures.

We have actually a design to extend this later to a proper "exactly once"
sink, integrated into Flink's checkpointing, which discards duplicates
properly by offset tracking and truncating/compacting.


On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <hpzorn@gmail.com> wrote:

> Hi Stephan,
>
> even though I started the discussion, I was just trying to estimate the
> effort. In that project they finally opted to use flume with a Kafka
> channel.
>
> Best, Hans-Peter
>
> On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr>
> wrote:
>
>> Hi Stephen,
>>
>>
>>
>> I do not have a Kafka->HDFS solution, but I do have a streaming sink that
>> writes to HDFS (external, text hive table) with auto-partitioning and
>> rolling files. However, it does not take care of checkpointing and may have
>> flushing issues if some partitions are seldom seen.
>>
>>
>>
>> I’m not sure it will save you much time, especially given the fact that
>> it has not been really used yet.
>>
>>
>>
>> Code is provided with no copyright and no warranty!
>>
>>
>>
>> *import* java.io.BufferedOutputStream;
>>
>> *import* java.io.IOException;
>>
>> *import* java.util.HashMap;
>>
>> *import* java.util.Map;
>>
>>
>>
>> *import* org.apache.commons.io.IOUtils;
>>
>> *import* org.apache.flink.api.java.tuple.Tuple2;
>>
>> *import* org.apache.flink.configuration.Configuration;
>>
>> *import* org.apache.flink.core.fs.FileSystem;
>>
>> *import* org.apache.flink.core.fs.Path;
>>
>> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>>
>> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord;
>>
>> *import* org.apache.hive.hcatalog.data.schema.HCatSchema;
>>
>> *import* org.joda.time.DateTime;
>>
>>
>>
>> /**
>>
>> * This sink streams data to a HDFS directory (hive external table) with a
>> size limit (rolling files) and automatic
>>
>> * partitioning. To be able to read the file content while it’s still
>> being written, an idea is to add a char(1) field in the last
>>
>> * position of the hive line and to check if it has the proper value when
>> read (if not, the line is not complete)
>>
>> *
>>
>>  * *@author* alinz
>>
>> */
>>
>> *public* *class* HiveStreamOutput *extends*
>> RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {
>>
>>
>>
>>     /**
>>
>>      * The Class StreamingFile, encapsulates an open output hdfs file
>>
>>      */
>>
>>     *public* *static* *class* StreamingFile {
>>
>>
>>
>>         /** base directory*/
>>
>>         *private* *final* String rootPath;
>>
>>         /** prefix*/
>>
>>         *private* *final* String prefix;
>>
>>
>>
>>         /** file path*/
>>
>>         *private* Path path;
>>
>>
>>
>>         /** open output stream */
>>
>>         *private* BufferedOutputStream stream;
>>
>>
>>
>>         /** current size */
>>
>>         *private* *long* size = 0;
>>
>>
>>
>>         /** current file number*/
>>
>>         *private* *long* nbFile = 0;
>>
>>
>>
>>         /** instant of the last writing on this stream. If the interval
>> is too long, flushes content*/
>>
>>         *private* *long* lastInvoke;
>>
>>
>>
>>         /**
>>
>>          * Instantiates a new streaming file.
>>
>>          * *@param* rootPath destination path
>>
>>          * *@param* prefix file name prefix
>>
>>          * *@throws* IOException cannot open file
>>
>>          */
>>
>>         *public* StreamingFile(String rootPath, String prefix) *throws*
>> IOException {
>>
>>             *super*();
>>
>>             *this*.rootPath = rootPath;
>>
>>             *this*.prefix = prefix;
>>
>>             lastInvoke = 0; // always flushes first record
>>
>>             open();
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * Create destination file on FS
>>
>>          * *@throws* IOException issue when opening file
>>
>>          */
>>
>>         *private* *void* open() *throws* IOException {
>>
>>             *this*.path = *new* Path(rootPath, prefix + nbFile);
>>
>>             *final* FileSystem filesys = path.getFileSystem();
>>
>>             filesys.mkdirs(path.getParent());
>>
>>             stream = *new* BufferedOutputStream(filesys.create(path,
>> *true*));
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * closes stream
>>
>>          */
>>
>>         *public* *void* closeStream() {
>>
>>             IOUtils.*closeQuietly*(stream);
>>
>>             stream = *null*; // NOPMD
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * Write data into the stream
>>
>>          * *@param* data data to write
>>
>>          * *@param* maxSize max size of data ; split the file if we
>> reach it
>>
>>          * *@throws* IOException writing issue
>>
>>          */
>>
>>         *public* *void* writeStream(*byte*[] data, *long* maxSize)
>> *throws* IOException {
>>
>>             stream.write(data);
>>
>>             // If the source is too slow, flushes the data. Using this
>> method, We do not always have the "last flushes",
>>
>>             // especially concerning old partitions.
>>
>>             // *TODO* If it's an issue, implements a time out thread.
>>
>>             *final* *long* maxDelayFlush = 100;
>>
>>             *final* *long* invokeTime = System.*currentTimeMillis*();
>>
>>             *if* (invokeTime - lastInvoke > maxDelayFlush) {
>>
>>                 stream.flush();
>>
>>             }
>>
>>             lastInvoke = invokeTime;
>>
>>             *if* (incTaille(data.length) >= maxSize) {
>>
>>                 split();
>>
>>             }
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * increment file size
>>
>>          * *@param* amount what to add
>>
>>          * *@return* the new size
>>
>>          */
>>
>>         *private* *long* incTaille(*long* amount) {
>>
>>             size += amount;
>>
>>             *return* size;
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * Closes current file and open a new one
>>
>>          * *@throws* IOException issue when opening file
>>
>>          */
>>
>>         *private* *void* split() *throws* IOException {
>>
>>             closeStream();
>>
>>             nbFile++;
>>
>>             open();
>>
>>             size = 0;
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * flushes stream
>>
>>          * *@throws* IOException I/O issue
>>
>>          */
>>
>>         *public* *void* flushStream() *throws* IOException {
>>
>>             stream.flush();
>>
>>         }
>>
>>     }
>>
>>
>>
>>     /** SUID. */
>>
>>     *private* *static* *final* *long* *serialVersionUID* = 1L;
>>
>>
>>
>>     // Shared fields
>>
>>
>>
>>     /** Output hive table scheme */
>>
>>     *private* *final* HCatSchema outputSchema;
>>
>>
>>
>>     /** field delimiter */
>>
>>     *private* *final* *char* delim;
>>
>>
>>
>>     /** hdfs root path */
>>
>>     *private* *final* String hdfsPath;
>>
>>
>>
>>     /** Max file size */
>>
>>     *private* *final* *long* maxSize;
>>
>>
>>
>>     // Subtask fields
>>
>>
>>
>>     /** filename prefix for a subtask, prevents conflicts with another
>> subtask or a previous run */
>>
>>     *private* *transient* String namePrefix;
>>
>>
>>
>>     /** map of streams indexed per met partition */
>>
>>     *private* *transient* Map<String, StreamingFile> streams;
>>
>>
>>
>>     /** instant of the last periodic flush */
>>
>>     *private* *transient* *long* lastFlushAll;
>>
>>
>>
>>     /**
>>
>>      * Builds a streamer.
>>
>>      * *@param* outputSchema output record schema (without partition)
>>
>>      * *@param* delim field delimiter
>>
>>      * *@param* hdfsPath HDFS destination path
>>
>>      * *@param* maxSize max size of a file (rolls the file if reached)
>>
>>      */
>>
>>     *public* HiveStreamOutput(HCatSchema outputSchema, *char* delim,
>> String hdfsPath, *long* maxSize) {
>>
>>         *super*();
>>
>>         *this*.outputSchema = outputSchema;
>>
>>         *this*.delim = delim;
>>
>>         *this*.hdfsPath = hdfsPath;
>>
>>         *this*.maxSize = maxSize;
>>
>>     }
>>
>>
>>
>>     /** {@inheritDoc} */
>>
>>     @Override
>>
>>     *public* *void* open(Configuration parameters) *throws* Exception { //
>> NOPMD
>>
>>         // Prefix is unique for a run and a subtask, to avoid conflicts
>>
>>         namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() +
>> "_" + (*new* DateTime().getMillis()) + "_";
>>
>>         streams = *new* HashMap<String, StreamingFile>();
>>
>>     }
>>
>>
>>
>>     /** {@inheritDoc} */
>>
>>     @Override
>>
>>     *public* *void* close() *throws* Exception { // NOPMD
>>
>>         *for* (*final* StreamingFile file : streams.values()) {
>>
>>             file.closeStream();
>>
>>         }
>>
>>     }
>>
>>
>>
>>     /** {@inheritDoc} */
>>
>>     @Override
>>
>>     *public* *void* invoke(Tuple2<String, DefaultHCatRecord> value)
>> *throws* Exception { // NOPMD
>>
>>         *final* String partition = value.f0;
>>
>>         *final* String record = HiveFileOutputFormat.*getRecordLine*(
>> value.f1, outputSchema, delim);
>>
>>         // Do we have an open data stream for this partition ?
>>
>>         StreamingFile file = streams.get(partition);
>>
>>         *if* (file == *null*) {
>>
>>             file = *new* StreamingFile(hdfsPath + "/" + partition,
>> namePrefix);
>>
>>             streams.put(partition, file);
>>
>>         }
>>
>>         file.writeStream(record.getBytes(), maxSize);
>>
>>
>>
>>         // Periodically flush all streams
>>
>>         *final* *long* invoke = System.*currentTimeMillis*();
>>
>>         *final* *long* flushPeriod = 10000;
>>
>>         *if* (invoke - lastFlushAll > flushPeriod) {
>>
>>             lastFlushAll = invoke;
>>
>>             *for* (*final* StreamingFile stream : streams.values()) {
>>
>>                 stream.flushStream();
>>
>>             }
>>
>>         }
>>
>>     }
>>
>> }
>>
>>
>>
>> And the missing HiveFileOutputFormat.*getRecordLine *:
>>
>>
>>
>> /***TODO*  partitions should not be part of the line. But since they are
>> in the last position, it's a minor issue
>>
>>      * Shared method to transform a hive record into a text line<br>
>>
>>      * *TODO*  use of deprecated types is more convenient, but I should
>> get rid of it.<br>
>>
>>      * *@param* record hive record
>>
>>      * *@param* schema line scheme
>>
>>      * *@param* separator field delimitor
>>
>>      * *@return* corresponding line, ended with \n
>>
>>      */
>>
>>     @SuppressWarnings("deprecation")
>>
>>     // because it's so convenient
>>
>>     *public* *static* String getRecordLine(DefaultHCatRecord record,
>> HCatSchema schema, *char* separator) {
>>
>>         *final* *int* fldNumbr = Math.*min*(schema.size(), record
>> .size());
>>
>>         *final* StringBuffer line = *new* StringBuffer();
>>
>>
>>
>>         *for* (*int* idx = 0; idx < fldNumbr; idx++) {
>>
>>             *final* Object fieldVal = record.get(idx);
>>
>>             *final* String strFieldVal;
>>
>>             *if* (fieldVal == *null*) {
>>
>>                 strFieldVal = "";
>>
>>             }
>>
>>             *else* {
>>
>>                 *switch* (schema.get(idx).getType()) {
>>
>>                     *case* *DOUBLE*:
>>
>>                     *case* *FLOAT*:
>>
>>                     *case* *DECIMAL*:
>>
>>                     *case* *BIGINT*:
>>
>>                     *case* *INT*:
>>
>>                     *case* *SMALLINT*:
>>
>>                     *case* *TINYINT*:
>>
>>                     *case* *CHAR*:
>>
>>                     *case* *STRING*:
>>
>>                     *case* *VARCHAR*:
>>
>>                     *case* *BOOLEAN*:
>>
>>                     *case* *DATE*:
>>
>>                     *case* *TIMESTAMP*:
>>
>>                         strFieldVal = fieldVal.toString();
>>
>>                         *break*;
>>
>>                     *case* *ARRAY*:
>>
>>                     *case* *MAP*:
>>
>>                     *case* *STRUCT*:
>>
>>                     *case* *BINARY*:
>>
>>                     *default*:
>>
>>                         *throw* *new* IllegalArgumentException("Complex
>> Hive types (" + schema.get(idx).getTypeString()
>>
>>                             + ") are not supported");
>>
>>                 }
>>
>>             }
>>
>>             line.append(strFieldVal);
>>
>>             *if* (idx < fldNumbr - 1) {
>>
>>                 line.append(separator);
>>
>>             }
>>
>>         }
>>
>>         line.append('\n');
>>
>>         *return* line.toString();
>>
>>     }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *De :* Rico Bergmann [mailto:info@ricobergmann.de]
>> *Envoyé :* mercredi 26 août 2015 07:49
>> *À :* user@flink.apache.org
>> *Objet :* Re: Flink to ingest from Kafka to HDFS?
>>
>>
>>
>> Hi!
>>
>>
>>
>> Sorry, I won't be able to implement this soon. I just shared my ideas on
>> this.
>>
>>
>>
>> Greets. Rico.
>>
>>
>> Am 25.08.2015 um 17:52 schrieb Stephan Ewen <sewen@apache.org>:
>>
>> Hi Rico!
>>
>>
>>
>> Can you give us an update on your status here? We actually need something
>> like this as well (and pretty urgent), so we would jump in
>>
>> and implement this, unless you have something already.
>>
>>
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> BTW: This is becoming a dev discussion, maybe should move to that list...
>>
>>
>>
>> On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> Yes, one needs exactly a mechanism to seek the output stream back to the
>> last checkpointed position, in order to overwrite duplicates.
>>
>>
>>
>> I think HDFS is not going to make this easy, there is basically no seek
>> for write. Not sure how to solve this, other then writing to tmp files and
>> copying upon success.
>>
>>
>>
>> Apache Flume must have solved this issue in some way, it may be a worth
>> looking into how they solved it.
>>
>>
>>
>> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <info@ricobergmann.de>
>> wrote:
>>
>> My ideas for checkpointing:
>>
>>
>>
>> I think writing to the destination should not depend on the checkpoint
>> mechanism (otherwise the output would never be written to the destination
>> if checkpointing is disabled). Instead I would keep the offsets of written
>> and Checkpointed records. When recovering you would then somehow delete or
>> overwrite the records after that offset. (But I don't really know whether
>> this is as simple as I wrote it ;-) ).
>>
>>
>>
>> Regarding the rolling files I would suggest making the values of the
>> user-defined partitioning function part of the path or file name. Writing
>> records is then basically:
>>
>> Extract the partition to write to, then add the record to a queue for
>> this partition. Each queue has an output format assigned to it. On flushing
>> the output file is opened, the content of the queue is written to it, and
>> then closed.
>>
>>
>>
>> Does this sound reasonable?
>>
>>
>> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <aljoscha@apache.org>:
>>
>> Yes, this seems like a good approach. We should probably no reuse the
>> KeySelector for this but maybe a more use-case specific type of function
>> that can create a desired filename from an input object.
>>
>>
>>
>> This is only the first part, though. The hard bit would be implementing
>> rolling files and also integrating it with Flink's checkpointing mechanism.
>> For integration with checkpointing you could maybe use "staging-files": all
>> elements are put into a staging file. And then, when the notification about
>> a completed checkpoint is received the contents of this file would me moved
>> (or appended) to the actual destination.
>>
>>
>>
>> Do you have any Ideas about the rolling files/checkpointing?
>>
>>
>>
>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <info@ricobergmann.de> wrote:
>>
>> I'm thinking about implementing this.
>>
>>
>>
>> After looking into the flink code I would basically subclass
>> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
>> additional KeySelector object. The path in the file system is then appended
>> by the string, the KeySelector returns.
>>
>>
>>
>> U think this is a good approach?
>>
>>
>>
>> Greets. Rico.
>>
>>
>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <sewen@apache.org>:
>>
>> If you are up for it, this would be a very nice addition to Flink, a
>> great contribution :-)
>>
>>
>>
>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> Hi!
>>
>>
>>
>> This should definitely be possible in Flink. Pretty much exactly like you
>> describe it.
>>
>>
>>
>> You need a custom version of the HDFS sink with some logic when to roll
>> over to a new file.
>>
>>
>>
>> You can also make the sink "exactly once" by integrating it with the
>> checkpointing. For that, you would probably need to keep the current path
>> and output stream offsets as of the last checkpoint, so you can resume from
>> that offset and overwrite records to avoid duplicates. If that is not
>> possible, you would probably buffer records between checkpoints and only
>> write on checkpoints.
>>
>>
>>
>> Greetings,
>> Stephan
>>
>>
>>
>>
>>
>>
>>
>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hpzorn@gmail.com>
>> wrote:
>>
>> Hi,
>>
>>
>>
>> Did anybody think of (mis-) using Flink streaming as an alternative to
>> Apache Flume just for ingesting data from Kafka (or other streaming
>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>> I assume it should be possible, but Is this a good idea to do?
>>
>>
>>
>> Flume basically is about consuming data from somewhere, peeking into each
>> record and then directing it to a specific directory/file in HDFS reliably.
>> I've seen there is a FlumeSink, but would it be possible to get the same
>> functionality with
>>
>> Flink alone?
>>
>>
>>
>> I've skimmed through the documentation and found the option to split the
>> output by key and the possibility to add multiple sinks. As I understand,
>> Flink programs are generally static, so it would not be possible to
>> add/remove sinks at runtime?
>>
>> So you would need to implement a custom sink directing the records to
>> different files based on a key (e.g. date)? Would it be difficult to
>> implement things like rolling outputs etc? Or better just use Flume?
>>
>>
>>
>> Best,
>>
>> Hans-Peter
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>

Mime
View raw message