Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CBA9217C43 for ; Tue, 30 Jun 2015 10:15:33 +0000 (UTC) Received: (qmail 89258 invoked by uid 500); 30 Jun 2015 10:15:33 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 89142 invoked by uid 500); 30 Jun 2015 10:15:33 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 88433 invoked by uid 99); 30 Jun 2015 10:15:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2015 10:15:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1ED2E04B3; Tue, 30 Jun 2015 10:15:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: uce@apache.org To: commits@flink.apache.org Date: Tue, 30 Jun 2015 10:15:50 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/51] [partial] flink-web git commit: [hotfix] Manual build of docs http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/0.9/setup/yarn_setup.html ---------------------------------------------------------------------- diff --git a/content/docs/0.9/setup/yarn_setup.html b/content/docs/0.9/setup/yarn_setup.html new file mode 100644 index 0000000..e45d2c2 --- /dev/null +++ b/content/docs/0.9/setup/yarn_setup.html @@ -0,0 +1,443 @@ + + + + + + + + + + + Apache Flink 0.9.0 Documentation: YARN Setup + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+
+

YARN Setup

+ + + + + + + +

Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):

+ +
# get the hadoop2 package from the Flink download page at
+# http://flink.apache.org/downloads.html
+tar xvzf flink-0.9.0-bin-hadoop2.tgz
+cd flink-0.9.0/
+./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
+ +

Specify the -s flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.

+ +

Once the session has been started, you can submit jobs to the cluster using the ./bin/flink tool.

+ + + +
# get the hadoop2 package from the Flink download page at
+# http://flink.apache.org/downloads.html
+tar xvzf flink-0.9.0-bin-hadoop2.tgz
+cd flink-0.9.0/
+./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-0.9.0-WordCount.jar
+ + + +

Apache Hadoop YARN is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.

+ +

Requirements

+ +
    +
  • Apache Hadoop 2.2
  • +
  • HDFS (Hadoop Distributed File System) (or another distributed file system supported by Hadoop)
  • +
+ +

If you have troubles using the Flink YARN client, have a look in the FAQ section.

+ + + +

Follow these instructions to learn how to launch a Flink Session within your YARN cluster.

+ +

A session will start all required Flink services (JobManager and TaskManagers) so that you can submit programs to the cluster. Note that you can run multiple programs per session.

+ + + +

Download the YARN tgz package on the download page. It contains the required files.

+ +

Extract the package using:

+ +
tar xvzf flink-0.9.0-bin-hadoop2.tgz
+cd flink-0.9.0/
+ +

If you want to build the YARN .tgz file from sources, follow the build instructions. You can find the result of the build in flink-dist/target/flink-0.9.0-bin/flink-0.9.0/ (Note: The version might be different for you ).

+ +

Start a Session

+ +

Use the following command to start a session

+ +
./bin/yarn-session.sh
+ +

This command will show you the following overview:

+ +
Usage:
+   Required
+     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
+   Optional
+     -D <arg>                        Dynamic properties
+     -d,--detached                   Start detached
+     -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
+     -q,--query                      Display available YARN resources (memory, cores)
+     -qu,--queue <arg>               Specify YARN queue.
+     -s,--slots <arg>                Number of slots per TaskManager
+     -st,--streaming                 Start Flink in streaming mode
+     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
+ +

Please note that the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR environment variable to be set to read the YARN and HDFS configuration.

+ +

Example: Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:

+ +
./bin/yarn-session.sh -n 10 -tm 8192 -s 32
+ +

The system will use the configuration in conf/flink-config.yaml. Please follow our configuration guide if you want to change something.

+ +

Flink on YARN will overwrite the following configuration parameters jobmanager.rpc.address (because the JobManager is always allocated at different machines), taskmanager.tmp.dirs (we are using the tmp directories given by YARN) and parallelism.default if the number of slots has been specified.

+ +

If you don’t want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the -D flag. So you can pass parameters this way: -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368.

+ +

The example invocation starts 11 containers, since there is one additional container for the ApplicationMaster and Job Manager.

+ +

Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.

+ +

Stop the YARN session by stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client.

+ +

Detached YARN session

+ +

If you do not want to keep the Flink YARN client running all the time, its also possible to start a detached YARN session. +The parameter for that is called -d or --detached.

+ +

In that case, the Flink YARN client will only submit Flink to the cluster and then close itself. +Note that in this case its not possible to stop the YARN session using Flink.

+ +

Use the YARN utilities (yarn application -kill <appId) to stop the YARN session.

+ + + +

Use the following command to submit a Flink program to the YARN cluster:

+ +
./bin/flink
+ +

Please refer to the documentation of the commandline client.

+ +

The command will show you a help menu like this:

+ +
[...]
+Action "run" compiles and runs a program.
+
+  Syntax: run [OPTIONS] <jar-file> <arguments>
+  "run" action arguments:
+     -c,--class <classname>           Class with the program entry point ("main"
+                                      method or "getPlan()" method. Only needed
+                                      if the JAR file does not specify the class
+                                      in its manifest.
+     -m,--jobmanager <host:port>      Address of the JobManager (master) to
+                                      which to connect. Use this flag to connect
+                                      to a different JobManager than the one
+                                      specified in the configuration.
+     -p,--parallelism <parallelism>   The parallelism with which to run the
+                                      program. Optional flag to override the
+                                      default value specified in the
+                                      configuration
+ +

Use the run action to submit a job to YARN. The client is able to determine the address of the JobManager. In the rare event of a problem, you can also pass the JobManager address using the -m argument. The JobManager address is visible in the YARN console.

+ +

Example

+ +
wget -O apache-license-v2.txt http://www.apache.org/licenses/LICENSE-2.0.txt
+hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
+./bin/flink run ./examples/flink-java-examples-0.9.0-WordCount.jar \
+        hdfs:///..../apache-license-v2.txt hdfs:///.../wordcount-result.txt
+ +

If there is the following error, make sure that all TaskManagers started:

+ +
Exception in thread "main" org.apache.flink.compiler.CompilerException:
+    Available instances could not be determined from job manager: Connection timed out.
+ +

You can check the number of TaskManagers in the JobManager web interface. The address of this interface is printed in the YARN session console.

+ +

If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.

+ + + +

The documentation above describes how to start a Flink cluster within a Hadoop YARN environment. +It is also possible to launch Flink within YARN only for executing a single job.

+ +

Please note that the client then expects the -yn value to be set (number of TaskManagers).

+ +

Example:

+ +
./bin/flink run -m yarn-cluster -yn 2 ./examples/flink-java-examples-0.9.0-WordCount.jar
+ +

The command line options of the YARN session are also available with the ./bin/flink tool. They are prefixed with a y or yarn (for the long argument options).

+ + + +

Flink’s YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the conf/flink-conf.yaml or when starting the YARN session, using -D parameters.

+ +
    +
  • yarn.reallocate-failed: This parameter controls whether Flink should reallocate failed TaskManager containers. Default: true
  • +
  • yarn.maximum-failed-containers: The maximum number of failed containers the ApplicationMaster accepts until it fails the YARN session. Default: The number of initally requested TaskManagers (-n).
  • +
  • yarn.application-attempts: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.
  • +
+ +

Debugging a failed YARN session

+ +

There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.

+ +

Log Files

+ +

In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the YARN log aggregation. +To enable it, users have to set the yarn.log-aggregation-enable property to true in the yarn-site.xml file. +Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.

+ +
yarn logs -applicationId <application ID>
+
+ +

Note that it takes a few seconds after the session has finished until the logs show up.

+ +

YARN Client console & Webinterfaces

+ +

The Flink YARN client also prints error messages in the terminal if errors occur during runtime (for example if a TaskManager stops working after some time).

+ +

In addition to that, there is the YARN Resource Manager webinterface (by default on port 8088). The port of the Resource Manager web interface is determined by the yarn.resourcemanager.webapp.address configuration value.

+ +

It allows to access log files for running YARN applications and shows diagnostics for failed apps.

+ +

Build YARN client for a specific Hadoop version

+ +

Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the build instructions for more details.

+ +

Background / Internals

+ +

This section briefly describes how Flink and YARN interact.

+ +

+ +

The YARN client needs to access the Hadoop configuration to connect to the YARN resource manager and to HDFS. It determines the Hadoop configuration using the following strategy:

+ +
    +
  • Test if YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH are set (in that order). If one of these variables are set, they are used to read the configuration.
  • +
  • If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the HADOOP_HOME environment variable. If it is set, the client tries to access $HADOOP_HOME/etc/hadoop (Hadoop 2) and $HADOOP_HOME/conf (Hadoop 1).
  • +
+ +

When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).

+ +

The next step of the client is to request (step 2) a YARN container to start the ApplicationMaster (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the ApplicationMaster (AM) is started.

+ +

The JobManager and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the AM container is also serving Flink’s web interface. The ports Flink is using for its services are the standard ports configured by the user + the application id as an offset. This allows users to execute multiple Flink YARN sessions in parallel.

+ +

After that, the AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.

+ + +
+ +
+ +
+
+
+ +
+ + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/README.md ---------------------------------------------------------------------- diff --git a/content/docs/master/README.md b/content/docs/master/README.md new file mode 100644 index 0000000..206b04c --- /dev/null +++ b/content/docs/master/README.md @@ -0,0 +1,60 @@ +This README gives an overview of how to build and contribute to the documentation of Apache Flink. + +The documentation is included with the source of Apache Flink in order to ensure that you always +have docs corresponding to your checked out version. The online documentation at +http://flink.apache.org/ is also generated from the files found here. + +# Requirements + +We use Markdown to write and Jekyll to translate the documentation to static HTML. You can install +all needed software via: + + gem install jekyll + gem install kramdown + sudo easy_install Pygments + +Kramdown is needed for Markdown processing and the Python based Pygments is used for syntax +highlighting. + +# Build + +The `docs/build_docs.sh` script calls Jekyll and generates the documentation in `docs/target`. You +can then point your browser to `docs/target/index.html` and start reading. + +If you call the script with the preview flag `build_docs.sh -p`, Jekyll will start a web server at +`localhost:4000` and watch the docs directory for updates. Use this mode to preview changes locally. + +# Contribute + +The documentation pages are written in +[Markdown](http://daringfireball.net/projects/markdown/syntax). It is possible to use the +[GitHub flavored syntax](http://github.github.com/github-flavored-markdown) and intermix plain html. + +In addition to Markdown, every page contains a Jekyll front matter, which specifies the title of the +page and the layout to use. The title is used as the top-level heading for the page. + + --- + title: "Title of the Page" + --- + +Furthermore, you can access variables found in `docs/_config.yml` as follows: + + {{ site.NAME }} + +This will be replaced with the value of the variable called `NAME` when generating +the docs. + +All documents are structed with headings. From these heading, a page outline is +automatically generated for each page. + +``` +# Level-1 Heading <- Used for the title of the page +## Level-2 Heading <- Start with this one +### Level-3 heading +#### Level-4 heading +##### Level-5 heading +``` + +Please stick to the "logical order" when using the headlines, e.g. start with level-2 headings and +use level-3 headings for subsections, etc. Don't use a different ordering, because you don't like +how a headline looks. http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/best_practices.html ---------------------------------------------------------------------- diff --git a/content/docs/master/apis/best_practices.html b/content/docs/master/apis/best_practices.html new file mode 100644 index 0000000..44b220b --- /dev/null +++ b/content/docs/master/apis/best_practices.html @@ -0,0 +1,411 @@ + + + + + + + + + + + Apache Flink 0.10-SNAPSHOT Documentation: Best Practices + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+
+

Best Practices

+ + + +

+ +

This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems.

+ + + + + +

Almost all Flink applications, both batch and streaming rely on external configuration parameters. +For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).

+ +

Since version 0.9 we are providing a simple utility called ParameterTool to provide at least some basic tooling for solving these problems.

+ +

Please note that you don’t have to use the ParameterTool explained here. Other frameworks such as Commons CLI, +argparse4j and others work well with Flink as well.

+ +

Getting your configuration values into the ParameterTool

+ +

The ParameterTool provides a set of predefined static methods for reading the configuration. The tool is internally expecting a Map<String, String>, so its very easy to integrate it with your own configuration style.

+ +

From .properties files

+ +

The following method will read a Properties file and provide the key/value pairs:

+ +
String propertiesFile = "/home/sam/flink/myjob.properties";
+ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
+ +

From the command line arguments

+ +

This allows getting arguments like --input hdfs:///mydata --elements 42 from the command line.

+ +
public static void main(String[] args) {
+	ParameterTool parameter = ParameterTool.fromArgs(args);
+	// .. regular code ..
+ +

From system properties

+ +

When starting a JVM, you can pass system properties to it: -Dinput=hdfs:///mydata. You can also initialize the ParameterTool from these system properties:

+ +
ParameterTool parameter = ParameterTool.fromSystemProperties();
+ + + +

Now that we’ve got the parameters from somewhere (see above) we can use them in various ways.

+ +

Directly from the ParameterTool

+ +

The ParameterTool itself has methods for accessing the values.

+ +
ParameterTool parameters = // ...
+parameter.getRequired("input");
+parameter.get("output", "myDefaultValue");
+parameter.getLong("expectedCount", -1L);
+parameter.getNumberOfParameters()
+// .. there are more methods available.
+ +

You can use the return values of these methods directly in the main() method (=the client submitting the application). +For example you could set the parallelism of a operator like this:

+ +
ParameterTool parameters = ParameterTool.fromArgs(args);
+int parallelism = parameters.get("mapParallelism", 2);
+DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
+ +

Since the ParameterTool is serializable, you can pass it to the functions itself:

+ +
ParameterTool parameters = ParameterTool.fromArgs(args);
+DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
+ +

and then use them inside the function for getting values from the command line.

+ +

Passing it as a Configuration object to single functions

+ +

The example below shows how to pass the parameters as a Configuration object to a user defined function.

+ +
ParameterTool parameters = ParameterTool.fromArgs(args);
+DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
+ +

In the Tokenizer, the object is now accessible in the open(Configuration conf) method:

+ +
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		parameters.getInteger("myInt", -1);
+		// .. do
+ +

Register the parameters globally

+ +

Parameters registered as a global job parameter at the ExecutionConfig allow you to access the configuration values from the JobManager web interface and all functions defined by the user.

+ +

Register the parameters globally

+ +
ParameterTool parameters = ParameterTool.fromArgs(args);
+
+// set up the execution environment
+final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setGlobalJobParameters(parameters);
+ +

Access them in any rich user function:

+ +
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+
+	@Override
+	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+		parameters.getRequired("input");
+		// .. do more ..
+ +

Naming large TupleX types

+ +

It is recommended to use POJOs (Plain old Java objects) instead of TupleX for data types with many fields. +Also, POJOs can be used to give large Tuple-types a name.

+ +

Example

+ +

Instead of using:

+ +
Tuple11<String, String, ..., String> var = new ...;
+ +

It is much easier to create a custom type extending from the large Tuple type.

+ +
CustomType var = new ...;
+
+public static class CustomType extends Tuple11<String, String, ..., String> {
+    // constructor matching super
+}
+ + + +

If you use a custom type in your Flink program which cannot be serialized by the +Flink type serializer, Flink falls back to using the generic Kryo +serializer. You may register your own serializer or a serialization system like +Google Protobuf or Apache Thrift with Kryo. To do that, simply register the type +class and the serializer in the ExecutionConfig of your Flink program.

+ +
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// register the class of the serializer as serializer for a type
+env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
+
+// register an instance as serializer for a type
+MySerializer mySerializer = new MySerializer();
+env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
+ +

Note that your custom serializer has to extend Kryo’s Serializer class. In the +case of Google Protobuf or Apache Thrift, this has already been done for +you:

+ +
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// register the Google Protobuf serializer with Kryo
+env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
+
+// register the serializer included with Apache Thrift as the standard serializer
+// TBaseSerializer states it should be initalized as a default Kryo serializer
+env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
+ +

For the above example to work, you need to include the necessary dependencies in +your Maven project file (pom.xml). In the dependency section, add the following +for Apache Thrift:

+ +
<dependency>
+	<groupId>com.twitter</groupId>
+	<artifactId>chill-thrift</artifactId>
+	<version>0.5.2</version>
+</dependency>
+<!-- libthrift is required by chill-thrift -->
+<dependency>
+	<groupId>org.apache.thrift</groupId>
+	<artifactId>libthrift</artifactId>
+	<version>0.6.1</version>
+	<exclusions>
+		<exclusion>
+			<groupId>javax.servlet</groupId>
+			<artifactId>servlet-api</artifactId>
+		</exclusion>
+		<exclusion>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+		</exclusion>
+	</exclusions>
+</dependency>
+ +

For Google Protobuf you need the following Maven dependency:

+ +
<dependency>
+	<groupId>com.twitter</groupId>
+	<artifactId>chill-protobuf</artifactId>
+	<version>0.5.2</version>
+</dependency>
+<!-- We need protobuf for chill-protobuf -->
+<dependency>
+	<groupId>com.google.protobuf</groupId>
+	<artifactId>protobuf-java</artifactId>
+	<version>2.5.0</version>
+</dependency>
+ +

Please adjust the versions of both libraries as needed.

+ +
+ +
+ +
+
+
+ +
+ + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/cli.html ---------------------------------------------------------------------- diff --git a/content/docs/master/apis/cli.html b/content/docs/master/apis/cli.html new file mode 100644 index 0000000..bfe695e --- /dev/null +++ b/content/docs/master/apis/cli.html @@ -0,0 +1,394 @@ + + + + + + + + + + + Apache Flink 0.10-SNAPSHOT Documentation: Command-Line Interface + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+
+

Command-Line Interface

+ + + +

Flink provides a command-line interface to run programs that are packaged +as JAR files, and control their execution. The command line interface is part +of any Flink setup, available in local single node setups and in +distributed setups. It is located under <flink-home>/bin/flink +and connects by default to the running Flink master (JobManager) that was +started from the same installation directory.

+ +

A prerequisite to using the command line interface is that the Flink +master (JobManager) has been started (via <flink-home>/bin/start- +local.sh or <flink-home>/bin/start-cluster.sh) or that a YARN +environment is available.

+ +

The command line can be used to

+ + + +

Examples

+ +
    +
  • +

    Run example program with no arguments.

    + +
    ./bin/flink run ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar
    +
    +
  • +
  • +

    Run example program with arguments for input and result files

    + +
    ./bin/flink run ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar \
    +                       file:///home/user/hamlet.txt file:///home/user/wordcount_out
    +
    +
  • +
  • +

    Run example program with parallelism 16 and arguments for input and result files

    + +
    ./bin/flink run -p 16 ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar \
    +                        file:///home/user/hamlet.txt file:///home/user/wordcount_out
    +
    +
  • +
  • +

    Run example program on a specific JobManager:

    + +
    ./bin/flink run -m myJMHost:6123 \
    +                       ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar \
    +                       file:///home/user/hamlet.txt file:///home/user/wordcount_out
    +
    +
  • +
  • +

    Run example program with a specific class as an entry point:

    + +
    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
    +                       ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar \
    +                       file:///home/user/hamlet.txt file:///home/user/wordcount_out
    +
    +
  • +
  • +

    Run example program using a per-job YARN cluster with 2 TaskManagers:

    + +
    ./bin/flink run -m yarn-cluster -yn 2 \
    +                       ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar \
    +                       hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
    +
    +
  • +
  • +

    Display the optimized execution plan for the WordCount example program as JSON:

    + +
    ./bin/flink info ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar \
    +                        file:///home/user/hamlet.txt file:///home/user/wordcount_out
    +
    +
  • +
  • +

    List scheduled and running jobs (including their JobIDs):

    + +
    ./bin/flink list
    +
    +
  • +
  • +

    List scheduled jobs (including their JobIDs):

    + +
    ./bin/flink list -s
    +
    +
  • +
  • +

    List running jobs (including their JobIDs):

    + +
    ./bin/flink list -r
    +
    +
  • +
  • +

    Cancel a job:

    + +
    ./bin/flink cancel <jobID>
    +
    +
  • +
+ +

Usage

+ +

The command line syntax is as follows:

+ +
./flink <ACTION> [OPTIONS] [ARGUMENTS]
+
+The following actions are available:
+
+Action "run" compiles and runs a program.
+
+  Syntax: run [OPTIONS] <jar-file> <arguments>
+  "run" action options:
+     -c,--class <classname>           Class with the program entry point ("main"
+                                      method or "getPlan()" method. Only needed
+                                      if the JAR file does not specify the class
+                                      in its manifest.
+     -m,--jobmanager <host:port>      Address of the JobManager (master) to
+                                      which to connect. Specify 'yarn-cluster'
+                                      as the JobManager to deploy a YARN cluster
+                                      for the job. Use this flag to connect to a
+                                      different JobManager than the one
+                                      specified in the configuration.
+     -p,--parallelism <parallelism>   The parallelism with which to run the
+                                      program. Optional flag to override the
+                                      default value specified in the
+                                      configuration.
+  Additional arguments if -m yarn-cluster is set:
+     -yD <arg>                            Dynamic properties
+     -yd,--yarndetached                   Start detached
+     -yj,--yarnjar <arg>                  Path to Flink jar file
+     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
+                                          MB]
+     -yn,--yarncontainer <arg>            Number of YARN container to allocate
+                                          (=Number of Task Managers)
+     -yq,--yarnquery                      Display available YARN resources
+                                          (memory, cores)
+     -yqu,--yarnqueue <arg>               Specify YARN queue.
+     -ys,--yarnslots <arg>                Number of slots per TaskManager
+     -yt,--yarnship <arg>                 Ship files in the specified directory
+                                          (t for transfer)
+     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
+                                          MB]
+
+
+Action "info" shows the optimized execution plan of the program (JSON).
+
+  Syntax: info [OPTIONS] <jar-file> <arguments>
+  "info" action options:
+     -c,--class <classname>           Class with the program entry point ("main"
+                                      method or "getPlan()" method. Only needed
+                                      if the JAR file does not specify the class
+                                      in its manifest.
+     -m,--jobmanager <host:port>      Address of the JobManager (master) to
+                                      which to connect. Specify 'yarn-cluster'
+                                      as the JobManager to deploy a YARN cluster
+                                      for the job. Use this flag to connect to a
+                                      different JobManager than the one
+                                      specified in the configuration.
+     -p,--parallelism <parallelism>   The parallelism with which to run the
+                                      program. Optional flag to override the
+                                      default value specified in the
+                                      configuration.
+
+
+Action "list" lists running and scheduled programs.
+
+  Syntax: list [OPTIONS]
+  "list" action options:
+     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
+                                   to connect. Specify 'yarn-cluster' as the
+                                   JobManager to deploy a YARN cluster for the
+                                   job. Use this flag to connect to a different
+                                   JobManager than the one specified in the
+                                   configuration.
+     -r,--running                  Show only running programs and their JobIDs
+     -s,--scheduled                Show only scheduled programs and their JobIDs
+
+
+Action "cancel" cancels a running program.
+
+  Syntax: cancel [OPTIONS] <Job ID>
+  "cancel" action options:
+     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
+                                   to connect. Specify 'yarn-cluster' as the
+                                   JobManager to deploy a YARN cluster for the
+                                   job. Use this flag to connect to a different
+                                   JobManager than the one specified in the
+                                   configuration.
+
+ +
+ +
+ +
+
+
+ +
+ + + + + + + + + + + + + +