flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Hadoop ETLing with Flink
Date Mon, 20 Apr 2015 14:52:33 GMT
For clarification:
I wrote "Right now, there is only support to read from HCatalog tables, but
not to write data to existing tables or create new ones.".

I think this is not correct.

You should be able to write to HCatalog using the regular
HadoopOutputFormat wrapper as described in the "loose integration" option.
Although, I think this has not been tried before. So it would be nice to
know whether it actually works or not.

2015-04-20 16:44 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> I agree, that looks very much like a common use case.
> Right now, there is only support to read from HCatalog tables, but not to
> write data to existing tables or create new ones.
>
> Would be a very nice feature to add, IMO.
>
> My guess (without having closely looked at the Hadoop HCatOutputFormat) is
> that this can be done in two ways.
>
> 1) Loose integration:
> - Use a Mapper to convert any data into a HCatRecord
> - Emit the HCatRecord to a HCatOutputFormat wrapped into a Flink
> HadoopOutputFormat.
> This requires custom logic for the Mapper, but is the most versatile
> solution and should more or less work out-of-the-box.
>
> 2) Tighter integration:
> - Make a special Flink HCatOutputFormat that wraps the Hadoop
> HCatOutputFormat but which offers special support to convert Flink Tuples
> into HCatRecords.
> This solution requires to check the input type of the output format
> against the HCat table schema and code for the translation to HCatRecords.
> This is of course nicer, if you want to emit Tuple data but more work to
> implement.
>
> Best, Fabian
>
> 2015-04-20 16:24 GMT+02:00 Papp, Stefan <Stefan.Papp@teradata.com>:
>
>> Hi,
>>
>>
>> Lets take Pig as an example...
>>
>>         collection = LOAD 'test_data.csv' USING PigStorage(';')
>>         AS (
>>                 col1:chararray,
>>                 col2:chararray,
>>         );
>>
>>         # use partitions
>>         STORE collection INTO 'import_table_hcat' USING
>> org.apache.hcatalog.pig.HCatStorer('datestamp=20150420');
>>
>> How would I implement this with Flink?
>>
>> Let us brainstorm about the code snippet...
>>
>>         final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>         CsvReader csvr = env.readCsvFile(filePath);
>>
>>         // TODO: Get data into a data set - How to read the whole file?
>>         // DataSet<Tuple2<Text, Text>> hadoopResult = csvr.
>>
>>
>>         // TODO: Store data into Hadoop - Write to HDFS / HCatalog
>>         // HadoopOutputFormat<Text, IntWritable> hadoopOF =
>>                                                   // create the Flink
>> wrapper.
>>                                                   new
>> HadoopOutputFormat<Text, IntWritable>(
>>                                                     // set the Hadoop
>> OutputFormat and specify the job.
>>                                                     new
>> TextOutputFormat<Text, IntWritable>(), job
>>                                                   );
>>
>> hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator",
>> " ");
>>
>> TextOutputFormat.setOutputPath(job, new Path(outputPath));
>>
>>                                                 // Emit data using the
>> Hadoop TextOutputFormat.
>>
>> hadoopResult.output(hadoopOF);
>>
>>
>> My idea is: If I create the tables in HCatalog in advance, I might add
>> them by writing to HDFS Hive directory. Any thoughts on this?
>>
>> Stefan
>>
>>
>>
>> -----Original Message-----
>> From: Robert Metzger [mailto:rmetzger@apache.org]
>> Sent: Monday, April 20, 2015 3:22 PM
>> To: dev@flink.apache.org
>> Subject: Re: Hadoop ETLing with Flink
>>
>> Hi Stefan,
>>
>> you can use Flink to load data into HDFS.
>> The CSV reader is suited for reading delimiter separated text files into
>> the system. But you can also read data from a lot of other sources (avro,
>> jdbc, mongodb, hcatalog).
>>
>> We don't have any utilities to make writing to HCatalog very easy, but
>> you can certainly write to HCatalog with Flink's Hadoop OutputFormat
>> wrappers:
>>
>> http://ci.apache.org/projects/flink/flink-docs-master/hadoop_compatibility.html#using-hadoop-outputformats
>>
>> Here is some documentation on how to use the Hcatalog output format:
>> https://cwiki.apache.org/confluence/display/Hive/HCatalog+InputOutput
>>
>> You probably have to do something like:
>>
>> HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName,
>> outputTableName, null)); HCatSchema s =
>> HCatOutputFormat.getTableSchema(job);
>> HCatOutputFormat.setSchema(job, s);
>>
>>
>>
>> Let me know if you need more help writing to Hcatalog.
>>
>>
>>
>>
>> On Mon, Apr 20, 2015 at 1:29 PM, Papp, Stefan <Stefan.Papp@teradata.com>
>> wrote:
>>
>> > Hi,
>> >
>> >
>> > I want  load CSV files into a Hadoop cluster. How could I do that with
>> > Flink?
>> >
>> > I know, I can load data into a CsvReader and then iterate over rows
>> > and transform them. Is there an easy way to store the results into
>> > HDFS+HCatalog within Flink?
>> >
>> > Thank you!
>> >
>> > Stefan Papp
>> > Lead Hadoop Consultant
>> >
>> > Teradata GmbH
>> > Mobile: +43 664 22 08 616
>> > stefan.papp@teradata.com<mailto:stefan.papp@teradata.com>
>> > teradata.com<http://www.teradata.com/>
>> >
>> > This e-mail is from Teradata Corporation and may contain information
>> > that is confidential or proprietary. If you are not the intended
>> > recipient, do not read, copy or distribute the e-mail or any
>> > attachments. Instead, please notify the sender and delete the e-mail
>> and any attachments. Thank you.
>> > Please consider the environment before printing.
>> >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message