Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A99EE17C5B for ; Tue, 30 Jun 2015 10:15:35 +0000 (UTC) Received: (qmail 90591 invoked by uid 500); 30 Jun 2015 10:15:35 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 90475 invoked by uid 500); 30 Jun 2015 10:15:35 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89331 invoked by uid 99); 30 Jun 2015 10:15:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2015 10:15:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9D76E35DD; Tue, 30 Jun 2015 10:15:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: uce@apache.org To: commits@flink.apache.org Date: Tue, 30 Jun 2015 10:16:17 -0000 Message-Id: <6cbfcc55fd9b4d339003bcf740626ad0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [47/51] [partial] flink-web git commit: [hotfix] Manual build of docs http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/0.9/apis/example_connectors.html ---------------------------------------------------------------------- diff --git a/content/docs/0.9/apis/example_connectors.html b/content/docs/0.9/apis/example_connectors.html new file mode 100644 index 0000000..8a4415e --- /dev/null +++ b/content/docs/0.9/apis/example_connectors.html @@ -0,0 +1,404 @@ + + + + + + + + + + + Apache Flink 0.9.0 Documentation: Connecting to other systems + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+
+

Connecting to other systems

+ + + +

Reading from filesystems.

+ +

Flink has build-in support for the following file systems:

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FilesystemSinceSchemeNotes
Hadoop Distributed File System (HDFS)0.2hdfs://All HDFS versions are supported
Amazon S30.2s3:// 
MapR file system0.7-incubatingmaprfs://The user has to manually place the required jar files in the lib/ dir
Tachyon0.9tachyon://Support through Hadoop file system implementation (see below)
+ + + +

Apache Flink allows users to use any file system implementing the org.apache.hadoop.fs.FileSystem +interface. Hadoop ships adapters for FTP, Hftp, and others.

+ +

Flink has integrated testcases to validate the integration with Tachyon. +Other file systems we tested the integration is the +Google Cloud Storage Connector for Hadoop and XtreemFS.

+ +

In order to use a Hadoop file system with Flink, make sure that the flink-conf.yaml has set the +fs.hdfs.hadoopconf property set to the Hadoop configuration directory. +In addition to that, the Hadoop configuration (in that directory) needs to have an entry for each supported file system. +For example for tachyon support, there must be the following entry in the core-site.xml file:

+ +
<property>
+  <name>fs.tachyon.impl</name>
+  <value>tachyon.hadoop.TFS</value>
+</property>
+ +

Also, the required classes for using the file system need to be placed in the lib/ folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the HADOOP_CLASSPATH environment variable to add Hadoop jar files to the classpath.

+ +

Connecting to other systems using Input / Output Format wrappers for Hadoop

+ +

Apache Flink allows users to access many different systems as data sources or sinks. +The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept +of so called InputFormats and OutputFormats.

+ +

One implementation of these InputFormats is the HadoopInputFormat. This is a wrapper that allows +users to use all existing Hadoop input formats with Flink.

+ +

This section shows some examples for connecting Flink to other systems. +Read more about Hadoop compatibility in Flink.

+ + + +

Flink has extensive build-in support for Apache Avro. This allows to easily read from Avro files with Flink. +Also, the serialization framework of Flink is able to handle classes generated from Avro schemas.

+ +

In order to read data from an Avro file, you have to specify an AvroInputFormat.

+ +

Example:

+ +
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataSet<User> usersDS = env.createInput(users);
+ +

Note that User is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:

+ +
usersDS.groupBy("name")
+ +

Note that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.

+ +

Flink’s POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type Object you can not use the field as a join or grouping key. +Specifying a field in Avro like this {"name": "type_double_test", "type": "double"}, works fine, however specifying it as a UNION-type with only one field ({"name": "type_double_test", "type": ["double"]},) will generate a field of type Object. Note that specifying nullable types ({"name": "type_double_test", "type": ["null", "double"]},) is possible!

+ +

Access Microsoft Azure Table Storage

+ +

Note: This example works starting from Flink 0.6-incubating

+ +

This example is using the HadoopInputFormat wrapper to use an existing Hadoop input format implementation for accessing Azure’s Table Storage.

+ +
    +
  1. +

    Download and compile the azure-tables-hadoop project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. +Execute the following commands:

    + +
    git clone https://github.com/mooso/azure-tables-hadoop.git
    +cd azure-tables-hadoop
    +mvn clean install
    +
  2. +
  3. +

    Setup a new Flink project using the quickstarts:

    + +
    curl http://flink.apache.org/q/quickstart.sh | bash
    +
  4. +
  5. +

    Add the following dependencies (in the <dependencies> section) to your pom.xml file:

    + +
    <dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-hadoop-compatibility</artifactId>
    +    <version>0.9.0</version>
    +</dependency>
    +<dependency>
    +  <groupId>com.microsoft.hadoop</groupId>
    +  <artifactId>microsoft-hadoop-azure</artifactId>
    +  <version>0.0.4</version>
    +</dependency>
    + +

    flink-hadoop-compatibility is a Flink package that provides the Hadoop input format wrappers. +microsoft-hadoop-azure is adding the project we’ve build before to our project.

    +
  6. +
+ +

The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!). +Browse to the code of the Job.java file. Its an empty skeleton for a Flink job.

+ +

Paste the following code into it:

+ +
import java.util.Map;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import com.microsoft.hadoop.azure.AzureTableConfiguration;
+import com.microsoft.hadoop.azure.AzureTableInputFormat;
+import com.microsoft.hadoop.azure.WritableEntity;
+import com.microsoft.windowsazure.storage.table.EntityProperty;
+
+public class AzureTableExample {
+
+  public static void main(String[] args) throws Exception {
+    // set up the execution environment
+    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+    
+    // create a  AzureTableInputFormat, using a Hadoop input format wrapper
+    HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
+
+    // set the Account URI, something like: https://apacheflink.table.core.windows.net
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO"); 
+    // set the secret storage key here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
+    // set the table name here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
+    
+    DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
+    // a little example how to use the data in a mapper.
+    DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
+      @Override
+      public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
+        System.err.println("--------------------------------\nKey = "+arg0.f0);
+        WritableEntity we = arg0.f1;
+
+        for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
+          System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
+        }
+
+        return arg0.f0.toString();
+      }
+    });
+
+    // emit result (this works only locally)
+    fin.print();
+
+    // execute program
+    env.execute("Azure Example");
+  }
+}
+ +

The example shows how to access an Azure table and turn data into Flink’s DataSet (more specifically, the type of the set is DataSet<Tuple2<Text, WritableEntity>>). With the DataSet, you can apply all known transformations to the DataSet.

+ +

Access MongoDB

+ +

This GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating).

+ + +
+ +
+ +
+
+
+ +
+ + + + + + + + + + + + + +