Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 21E87178F3 for ; Wed, 4 Feb 2015 20:47:57 +0000 (UTC) Received: (qmail 77592 invoked by uid 500); 4 Feb 2015 20:47:58 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 77545 invoked by uid 500); 4 Feb 2015 20:47:58 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 77536 invoked by uid 99); 4 Feb 2015 20:47:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Feb 2015 20:47:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 04 Feb 2015 20:47:56 +0000 Received: (qmail 76651 invoked by uid 99); 4 Feb 2015 20:47:36 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Feb 2015 20:47:36 +0000 Date: Wed, 4 Feb 2015 20:47:36 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-1396) Add hadoop input formats directly to the user API. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/FLINK-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14305914#comment-14305914 ] ASF GitHub Bot commented on FLINK-1396: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/363#discussion_r24117856 --- Diff: docs/hadoop_compatibility.md --- @@ -52,56 +63,70 @@ Add the following dependency to your `pom.xml` to use the Hadoop Compatibility L ### Using Hadoop Data Types -Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details. +Flink supports all Hadoop `Writable` and `WritableComparable` data types +out-of-the-box. You do not need to include the Hadoop Compatibility dependency, +if you only want to use your Hadoop data types. See the +[Programming Guide](programming_guide.html#data-types) for more details. ### Using Hadoop InputFormats -Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair. - -Flink's InputFormat wrappers are - -- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and -- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat` +Hadoop input formats can be used to create a data source by using +on of the methods `readHadoopFile` or `createHadoopInput` of the +`ExecutionEnvironment`. The former is used for input formats derived +from `FileInputFormat` while the latter has to be used for general purpose +input formats. -and can be used as regular Flink [InputFormats](programming_guide.html#data-sources). +The resulting `DataSet` contains 2-tuples where the first field +is the key and the second field is the value retrieved from the Hadoop +InputFormat. The following example shows how to use Hadoop's `TextInputFormat`. +
+
+ ~~~java ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - -// Set up the Hadoop TextInputFormat. -Job job = Job.getInstance(); -HadoopInputFormat hadoopIF = - // create the Flink wrapper. - new HadoopInputFormat( - // create the Hadoop InputFormat, specify key and value type, and job. - new TextInputFormat(), LongWritable.class, Text.class, job - ); -TextInputFormat.addInputPath(job, new Path(inputPath)); - -// Read data using the Hadoop TextInputFormat. -DataSet> text = env.createInput(hadoopIF); + +DataSet> input = + env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); // Do something with the data. [...] ~~~ -### Using Hadoop OutputFormats +
+
-Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat. +~~~scala +val env = ExecutionEnvironment.getExecutionEnvironment + +val input: DataSet[(LongWritable, Text)] = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) -Flink's OUtputFormat wrappers are +// Do something with the data. +[...] +~~~ + +
-- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and -- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat` +
+ +### Using Hadoop OutputFormats -and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks). +Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class +that implements `org.apache.hadoop.mapred.OutputFormat` or extend --- End diff -- extend -> extends > Add hadoop input formats directly to the user API. > -------------------------------------------------- > > Key: FLINK-1396 > URL: https://issues.apache.org/jira/browse/FLINK-1396 > Project: Flink > Issue Type: Bug > Reporter: Robert Metzger > Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.3.4#6332)