Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56DA417C57 for ; Tue, 30 Jun 2015 10:15:35 +0000 (UTC) Received: (qmail 90344 invoked by uid 500); 30 Jun 2015 10:15:35 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 90221 invoked by uid 500); 30 Jun 2015 10:15:34 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89080 invoked by uid 99); 30 Jun 2015 10:15:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2015 10:15:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73656E35EE; Tue, 30 Jun 2015 10:15:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: uce@apache.org To: commits@flink.apache.org Date: Tue, 30 Jun 2015 10:16:12 -0000 Message-Id: <18ea0ab293504cdc96e44003929ada02@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [42/51] [partial] flink-web git commit: [hotfix] Manual build of docs http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/0.9/apis/python.html ---------------------------------------------------------------------- diff --git a/content/docs/0.9/apis/python.html b/content/docs/0.9/apis/python.html new file mode 100644 index 0000000..ad0c507 --- /dev/null +++ b/content/docs/0.9/apis/python.html @@ -0,0 +1,764 @@ + + + + + + + + + + + Apache Flink 0.9.0 Documentation: Python Programming Guide + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+
+

Python Programming Guide Beta

+ + + +

+ +

Analysis programs in Flink are regular programs that implement transformations on data sets +(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain +sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for +example write the data to (distributed) files, or to standard output (for example the command line +terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. +The execution can happen in a local JVM, or on clusters of many machines.

+ +

In order to create your own Flink program, we encourage you to start with the +program skeleton and gradually add your own +transformations. The remaining sections act as references for additional +operations and advanced features.

+ + + +

Example Program

+ +

The following program is a complete, working example of WordCount. You can copy & paste the code +to run it locally.

+ +
from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator])
+    collector.collect((count, word))
+
+if __name__ == "__main__":
+  env = get_environment()
+  data = env.from_elements("Who's there?",
+   "I think I hear them. Stand, ho! Who's there?")
+  
+  data \
+    .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \
+    .group_by(1) \
+    .reduce_group(Adder(), (INT, STRING), combinable=True) \
+    .output()
+  
+  env.execute(local=True)
+ +

Back to top

+ +

Program Skeleton

+ +

As we already saw in the example, Flink programs look like regular python +programs with a if __name__ == "__main__": block. Each program consists of the same basic parts:

+ +
    +
  1. Obtain an Environment,
  2. +
  3. Load/create the initial data,
  4. +
  5. Specify transformations on this data,
  6. +
  7. Specify where to put the results of your computations, and
  8. +
  9. Execute your program.
  10. +
+ +

We will now give an overview of each of those steps but please refer to the respective sections for +more details.

+ +

The Environment is the basis for all Flink programs. You can +obtain one using these static methods on class Environment:

+ +
get_environment()
+ +

For specifying data sources the execution environment has several methods +to read from files. To just read a text file as a sequence of lines, you can use:

+ +
env = get_environment()
+text = env.read_text("file:///path/to/file")
+ +

This will give you a DataSet on which you can then apply transformations. For +more information on data sources and input formats, please refer to +Data Sources.

+ +

Once you have a DataSet you can apply transformations to create a new +DataSet which you can then write to a file, transform again, or +combine with other DataSets. You apply transformations by calling +methods on DataSet with your own custom transformation function. For example, +a map transformation looks like this:

+ +
data.map(lambda x: x*2, INT)
+ +

This will create a new DataSet by doubling every value in the original DataSet. +For more information and a list of all the transformations, +please refer to Transformations.

+ +

Once you have a DataSet that needs to be written to disk you can call one +of these methods on DataSet:

+ +
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
+write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
+output()
+ +

The last method is only useful for developing/debugging on a local machine, +it will output the contents of the DataSet to standard output. (Note that in +a cluster, the result goes to the standard out stream of the cluster nodes and ends +up in the .out files of the workers). +The first two do as the name suggests. +Please refer to Data Sinks for more information on writing to files.

+ +

Once you specified the complete program you need to call execute on +the Environment. This will either execute on your local machine or submit your program +for execution on a cluster, depending on how Flink was started. You can force +a local execution by using execute(local=True).

+ +

Back to top

+ +

Project setup

+ +

Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.

+ +

The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed.

+ +

Back to top

+ +

Lazy Evaluation

+ +

All Flink programs are executed lazily: When the program’s main method is executed, the data loading +and transformations do not happen directly. Rather, each operation is created and added to the +program’s plan. The operations are actually executed when one of the execute() methods is invoked +on the Environment object. Whether the program is executed locally or on a cluster depends +on the environment of the program.

+ +

The lazy evaluation lets you construct sophisticated programs that Flink executes as one +holistically planned unit.

+ +

Back to top

+ +

Transformations

+ +

Data transformations transform one or more DataSets into a new DataSet. Programs can combine +multiple transformations into sophisticated assemblies.

+ +

This section gives a brief overview of the available transformations. The transformations +documentation has a full description of all transformations with +examples.

+ +


+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
TransformationDescription
Map +

Takes one element and produces one element.

+ +
data.map(lambda x: x * 2, INT)
+ +
FlatMap +

Takes one element and produces zero, one, or more elements.

+ +
data.flat_map(
+  lambda x,c: [(1,word) for word in line.lower().split() for line in x],
+  (INT, STRING))
+ +
MapPartition +

Transforms a parallel partition in a single function call. The function get the partition + as an `Iterator` and can produce an arbitrary number of result values. The number of + elements in each partition depends on the degree-of-parallelism and previous operations.

+ +
data.map_partition(lambda x,c: [value * 2 for value in x], INT)
+ +
Filter +

Evaluates a boolean function for each element and retains those for which the function + returns true.

+ +
data.filter(lambda x: x > 1000)
+ +
Reduce +

Combines a group of elements into a single element by repeatedly combining two elements + into one. Reduce may be applied on a full data set, or on a grouped data set.

+ +
data.reduce(lambda x,y : x + y)
+ +
ReduceGroup +

Combines a group of elements into one or more elements. ReduceGroup may be applied on a + full data set, or on a grouped data set.

+ +
class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator)      
+    collector.collect((count, word))
+
+data.reduce_group(Adder(), (INT, STRING))
+ +
Join + Joins two data sets by creating all pairs of elements that are equal on their keys. + Optionally uses a JoinFunction to turn the pair of elements into a single element. + See keys on how to define join keys. + +
# In this case tuple fields are used as keys. 
+# "0" is the join field on the first tuple
+# "1" is the join field on the second tuple.
+result = input1.join(input2).where(0).equal_to(1)
+ +
CoGroup +

The two-dimensional variant of the reduce operation. Groups each input on one or more + fields and then joins the groups. The transformation function is called per pair of groups. + See keys on how to define coGroup keys.

+ +
data1.co_group(data2).where(0).equal_to(1)
+ +
Cross +

Builds the Cartesian product (cross product) of two inputs, creating all pairs of + elements. Optionally uses a CrossFunction to turn the pair of elements into a single + element.

+ +
result = data1.cross(data2)
+ +
Union +

Produces the union of two data sets.

+ +
data.union(data2)
+ +
+ +

Back to Top

+ +

Specifying Keys

+ +

Some transformations (like Join or CoGroup) require that a key is defined on +its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are +applied.

+ +

A DataSet is grouped as

+ +
reduced = data \
+  .group_by(<define key here>) \
+  .reduce_group(<do something>)
+ +

The data model of Flink is not based on key-value pairs. Therefore, +you do not need to physically pack the data set types into keys and +values. Keys are “virtual”: they are defined as functions over the +actual data to guide the grouping operator.

+ +

Define keys for Tuples

+ +

The simplest case is grouping a data set of Tuples on one or more +fields of the Tuple:

+ +
reduced = data \
+  .group_by(0) \
+  .reduce_group(<do something>)
+ +

The data set is grouped on the first field of the tuples. +The group-reduce function will thus receive groups of tuples with +the same value in the first field.

+ +
grouped = data \
+  .group_by(0,1) \
+  .reduce(/*do something*/)
+ +

The data set is grouped on the composite key consisting of the first and the +second fields, therefore the reduce function will receive groups +with the same value for both fields.

+ +

A note on nested Tuples: If you have a DataSet with a nested tuple +specifying group_by(<index of tuple>) will cause the system to use the full tuple as a key.

+ +

Back to top

+ + + +

Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.

+ +
data.filter(lambda x: x > 5)
+ +
class Filter(FilterFunction):
+    def filter(self, value):
+        return value > 5
+
+data.filter(Filter())
+ +

Rich functions allow the use of imported functions, provide access to broadcast-variables, +can be parameterized using init(), and are the go-to-option for complex functions. +They are also the only way to define an optional combine function for a reduce operation.

+ +

Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return +an iterable, if the operation can return multiple values. (All functions receiving a collector argument)

+ +

Flink requires type information at the time when it prepares the program for execution +(when the main method of the program is called). This is done by passing an exemplary +object that has the desired type. This holds also for tuples.

+ +
(INT, STRING)
+ +

Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.

+ +

There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.

+ +

Back to top

+ +

Data Types

+ +

Flink’s Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.

+ +

Tuples/Lists

+ +

You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain +a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.

+ +
word_counts = env.from_elements(("hello", 1), ("world",2))
+
+counts = word_counts.map(lambda x: x[1], INT)
+ +

When working with operators that require a Key for grouping or matching records, +Tuples let you simply specify the positions of the fields to be used as key. You can specify more +than one position to use composite keys (see Section Data Transformations).

+ +
wordCounts \
+    .group_by(0) \
+    .reduce(MyReduceFunction())
+ +

Back to top

+ +

Data Sources

+ +

Data sources create the initial data sets, such as from files or from collections.

+ +

File-based:

+ +
    +
  • read_text(path) - Reads files line wise and returns them as Strings.
  • +
  • read_csv(path, type) - Parses files of comma (or another char) delimited fields. +Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field +types.
  • +
+ +

Collection-based:

+ +
    +
  • from_elements(*args) - Creates a data set from a Seq. All elements
  • +
+ +

Examples

+ +
env  = get_environment
+
+# read text file from local files system
+localLiens = env.read_text("file:#/path/to/my/textfile")
+
+ read text file from a HDFS running at nnHost:nnPort
+hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
+
+ read a CSV file with three fields
+csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
+
+ create a set from some given elements
+values = env.from_elements("Foo", "bar", "foobar", "fubar")
+ +

Back to top

+ +

Data Sinks

+ +

Data sinks consume DataSets and are used to store or return them:

+ +
    +
  • write_text() - Writes elements line-wise as Strings. The Strings are +obtained by calling the str() method of each element.
  • +
  • write_csv(...) - Writes tuples as comma-separated value files. Row and field +delimiters are configurable. The value for each field comes from the str() method of the objects.
  • +
  • output() - Prints the str() value of each element on the +standard out.
  • +
+ +

A DataSet can be input to multiple operations. Programs can write or print a data set and at the +same time run additional transformations on them.

+ +

Examples

+ +

Standard data sink methods:

+ +
write DataSet to a file on the local file system
+textData.write_text("file:///my/result/on/localFS")
+
+ write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
+textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
+
+ write DataSet to a file and overwrite the file if it exists
+textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
+
+ tuples as lines with pipe as the separator "a|b|c"
+values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
+
+ this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
+values.write_text("file:///path/to/the/result/file")
+ +

Back to top

+ +

Broadcast Variables

+ +

Broadcast variables allow you to make a data set available to all parallel instances of an +operation, in addition to the regular input of the operation. This is useful for auxiliary data +sets, or data-dependent parameterization. The data set will then be accessible at the operator as a +Collection.

+ +
    +
  • Broadcast: broadcast sets are registered by name via with_broadcast_set(DataSet, String)
  • +
  • Access: accessible via self.context.get_broadcast_variable(String) at the target operator
  • +
+ +
class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("bcv")[0][0]
+        return value * factor
+
+# 1. The DataSet to be broadcasted
+toBroadcast = env.from_elements(1, 2, 3) 
+data = env.from_elements("a", "b")
+
+# 2. Broadcast the DataSet
+data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast)
+ +

Make sure that the names (bcv in the previous example) match when registering and +accessing broadcasted data sets.

+ +

Note: As the content of broadcast variables is kept in-memory on each node, it should not become +too large. For simpler things like scalar values you can simply parameterize the rich function.

+ +

Back to top

+ +

Parallel Execution

+ +

This section describes how the parallel execution of programs can be configured in Flink. A Flink +program consists of multiple tasks (operators, data sources, and sinks). A task is split into +several parallel instances for execution and each parallel instance processes a subset of the task’s +input data. The number of parallel instances of a task is called its parallelism or degree of +parallelism (DOP).

+ +

The degree of parallelism of a task can be specified in Flink on different levels.

+ +

Execution Environment Level

+ +

Flink programs are executed in the context of an execution environmentt. An +execution environment defines a default parallelism for all operators, data sources, and data sinks +it executes. Execution environment parallelism can be overwritten by explicitly configuring the +parallelism of an operator.

+ +

The default parallelism of an execution environment can be specified by calling the +set_degree_of_parallelism() method. To execute all operators, data sources, and data sinks of the +WordCount example program with a parallelism of 3, set the default parallelism of the +execution environment as follows:

+ +
env = get_environment()
+env.set_degree_of_parallelism(3)
+
+text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \
+    .group_by(1) \
+    .reduce_group(Adder(), (INT, STRING), combinable=True) \
+    .output()
+
+env.execute()
+ +

System Level

+ +

A system-wide default parallelism for all execution environments can be defined by setting the +parallelization.degree.default property in ./conf/flink-conf.yaml. See the +Configuration documentation for details.

+ +

Back to top

+ +

Executing Plans

+ +

To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. +use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed +as the first argument, followed by a number of additional python packages, and finally, separated by - additional +arguments that will be fed to the script.

+ +
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
+ +

Back to top

+ +

Debugging

+ +

If you are running Flink programs locally, you can debug your program following this guide. +First you have to enable debugging by setting the debug switch in the env.execute(debug=True) call. After +submitting your program, open the jobmanager log file, and look for a line that says +Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port> Now open /tmp/flink in your python +IDE and run the executor.py <port>.

+ +

Back to top

+ +
+ +
+ +
+
+
+ +
+ + + + + + + + + + + + + +