flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/4] flink git commit: [FLINK-1461][api-extending] Add SortPartition operator to Java and Scala APIs.
Date Fri, 20 Feb 2015 15:11:48 GMT
[FLINK-1461][api-extending] Add SortPartition operator to Java and Scala APIs.

This closes #381


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d849703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d849703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d849703

Branch: refs/heads/master
Commit: 3d84970364ced41d1497269dc3c9d0b5835f9e1e
Parents: f0a28bf
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Feb 10 18:35:13 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Feb 20 16:10:22 2015 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 |  35 +++
 docs/programming_guide.md                       |  26 ++
 .../org/apache/flink/compiler/PactCompiler.java |   5 +
 .../flink/compiler/dag/SortPartitionNode.java   | 127 ++++++++
 .../base/SortPartitionOperatorBase.java         |  88 ++++++
 .../java/org/apache/flink/api/java/DataSet.java |  31 +-
 .../flink/api/java/SortPartitionOperator.java   | 182 +++++++++++
 .../org/apache/flink/api/scala/DataSet.scala    |  23 +-
 .../javaApiOperators/SortPartitionITCase.java   | 305 +++++++++++++++++++
 .../util/CollectionDataSets.java                |   2 -
 10 files changed, 820 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index 223d24c..13082c1 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -1224,6 +1224,41 @@ val out = in.partitionByHash(0).mapPartition { ... }
 </div>
 </div>
 
+### Sort Partition
+
+Locally sorts all partitions of a DataSet on a specified field in a specified order.
+Fields can be specified as field expressions or field positions (see [Reduce examples](#reduce-on-grouped-dataset)
for how to specify keys).
+Partitions can be sorted on multiple fields by chaining `sortPartition()` calls.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+DataSet<Tuple2<String, Integer>> in = // [...]
+// Locally sort partitions in ascending order on the second String field and 
+// in descending order on the first String field.
+// Apply a MapPartition transformation on the sorted partitions.
+DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
+                                          .sortPartition(0, Order.DESCENDING)
+                                        .mapPartition(new PartitionMapper());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val in: DataSet[(String, Int)] = // [...]
+// Locally sort partitions in ascending order on the second String field and 
+// in descending order on the first String field.
+// Apply a MapPartition transformation on the sorted partitions.
+val out = in.sortPartition(1, Order.ASCENDING)
+              .sortPartition(0, Order.DESCENDING)
+            .mapPartition { ... }
+~~~
+
+</div>
+</div>
+
 ### First-n
 
 Returns the first n (arbitrary) elements of a DataSet. First-n can be applied on a regular
DataSet, a grouped DataSet, or a grouped-sorted DataSet. Grouping keys can be specified as
key-selector functions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset)
for how to specify keys).

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 3dcd770..efedc1b 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -653,6 +653,19 @@ DataSet<Integer> result = in.partitionByHash(0)
       </td>
     </tr>
     <tr>
+      <td><strong>Sort Partition</strong></td>
+      <td>
+        <p>Locally sorts all partitions of a data set on a specified field in a specified
order. 
+          Fields can be specified as tuple positions or field expressions. 
+          Sorting on multiple fields is done by chaining sortPartition() calls.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
+                            .mapPartition(new PartitionMapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
       <td><strong>First-n</strong></td>
       <td>
         <p>Returns the first n (arbitrary) elements of a data set. First-n can be applied
on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can
be specified as key-selector functions or field position keys.</p>
@@ -869,6 +882,19 @@ val result = in.partitionByHash(0).mapPartition { ... }
 {% endhighlight %}
       </td>
     </tr>
+    </tr>
+    <tr>
+      <td><strong>Sort Partition</strong></td>
+      <td>
+        <p>Locally sorts all partitions of a data set on a specified field in a specified
order. 
+          Fields can be specified as tuple positions or field expressions. 
+          Sorting on multiple fields is done by chaining sortPartition() calls.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
+{% endhighlight %}
+      </td>
+    </tr>
     <tr>
       <td><strong>First-n</strong></td>
       <td>

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 6d6bcc7..a82bb74 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.compiler.dag.SortPartitionNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -714,6 +716,9 @@ public class PactCompiler {
 			else if (c instanceof PartitionOperatorBase) {
 				n = new PartitionNode((PartitionOperatorBase<?>) c);
 			}
+			else if (c instanceof SortPartitionOperatorBase) {
+				n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
+			}
 			else if (c instanceof PartialSolutionPlaceHolder) {
 				if (this.parent == null) {
 					throw new InvalidProgramException("It is currently not supported to create data sinks
inside iterations.");

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
new file mode 100644
index 0000000..dc16f50
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SortPartitionNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer's internal representation of a <i>SortPartition</i> operator
node.
+ */
+public class SortPartitionNode extends SingleInputNode {
+
+	private final List<OperatorDescriptorSingle> possibleProperties;
+
+	public SortPartitionNode(SortPartitionOperatorBase<?> operator) {
+		super(operator);
+		
+		OperatorDescriptorSingle descr = new SortPartitionDescriptor(operator.getPartitionOrdering());
+		this.possibleProperties = Collections.singletonList(descr);
+	}
+
+	@Override
+	public SortPartitionOperatorBase<?> getPactContract() {
+		return (SortPartitionOperatorBase<?>) super.getPactContract();
+	}
+
+	@Override
+	public String getName() {
+		return "Sort-Partition";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// sorting does not change the number of records
+		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
+		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
+	}
+	
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class SortPartitionDescriptor extends OperatorDescriptorSingle {
+
+		private Ordering partitionOrder;
+
+		public SortPartitionDescriptor(Ordering partitionOrder) {
+			this.partitionOrder = partitionOrder;
+		}
+		
+		@Override
+		public DriverStrategy getStrategy() {
+			return DriverStrategy.UNARY_NO_OP;
+		}
+
+		@Override
+		public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+			return new SingleInputPlanNode(node, "Sort-Partition", in, DriverStrategy.UNARY_NO_OP);
+		}
+
+		@Override
+		protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+			// sort partition does not require any global property
+			return Collections.singletonList(new RequestedGlobalProperties());
+		}
+
+		@Override
+		protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+			// set partition order as required local property
+			RequestedLocalProperties rlp = new RequestedLocalProperties();
+			rlp.setOrdering(this.partitionOrder);
+
+			return Collections.singletonList(rlp);
+		}
+		
+		@Override
+		public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+			// sort partition is a no-operation operation, such that all global properties are preserved.
+			return gProps;
+		}
+		
+		@Override
+		public LocalProperties computeLocalProperties(LocalProperties lProps) {
+			// sort partition is a no-operation operation, such that all global properties are preserved.
+			return lProps;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
new file mode 100644
index 0000000..6fe237d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/SortPartitionOperatorBase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.NoOpFunction;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * @param <IN> The input and result type.
+ */
+public class SortPartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpFunction>
{
+
+	private final Ordering partitionOrdering;
+
+
+	public SortPartitionOperatorBase(UnaryOperatorInformation<IN, IN> operatorInfo, Ordering
partitionOrdering, String name) {
+		super(new UserCodeObjectWrapper<NoOpFunction>(new NoOpFunction()), operatorInfo,
name);
+		this.partitionOrdering = partitionOrdering;
+	}
+
+	public Ordering getPartitionOrdering() {
+		return partitionOrdering;
+	}
+
+	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext,
ExecutionConfig executionConfig) {
+
+		TypeInformation<IN> inputType = getInput().getOperatorInfo().getOutputType();
+
+		int[] sortColumns = this.partitionOrdering.getFieldPositions();
+		boolean[] sortOrderings = this.partitionOrdering.getFieldSortDirections();
+
+		final TypeComparator<IN> sortComparator;
+		if (inputType instanceof CompositeType) {
+			sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings,
0, executionConfig);
+		} else if (inputType instanceof AtomicType) {
+			sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0], executionConfig);
+		} else {
+			throw new UnsupportedOperationException("Partition sorting does not support type "+inputType+"
yet.");
+		}
+
+		Collections.sort(inputData, new Comparator<IN>() {
+			@Override
+			public int compare(IN o1, IN o2) {
+				return sortComparator.compare(o1, o2);
+			}
+		});
+
+		return inputData;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index f2091e2..327a15a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
@@ -1086,7 +1087,35 @@ public abstract class DataSet<T> {
 	public PartitionOperator<T> rebalance() {
 		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE, Utils.getCallLocationName());
 	}
-		
+
+	// --------------------------------------------------------------------------------------------
+	//  Sorting
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Locally sorts the partitions of the DataSet on the specified field in the specified order.
+	 * DataSet can be sorted on multiple fields by chaining sortPartition() calls.
+	 *
+	 * @param field The field index on which the DataSet is sorted.
+	 * @param order The order in which the DataSet is sorted.
+	 * @return The DataSet with sorted local partitions.
+	 */
+	public SortPartitionOperator<T> sortPartition(int field, Order order) {
+		return new SortPartitionOperator<T>(this, field, order, Utils.getCallLocationName());
+	}
+
+	/**
+	 * Locally sorts the partitions of the DataSet on the specified field in the specified order.
+	 * DataSet can be sorted on multiple fields by chaining sortPartition() calls.
+	 *
+	 * @param field The field expression referring to the field on which the DataSet is sorted.
+	 * @param order The order in which the DataSet is sorted.
+	 * @return The DataSet with sorted local partitions.
+	 */
+	public SortPartitionOperator<T> sortPartition(String field, Order order) {
+		return new SortPartitionOperator<T>(this, field, order, Utils.getCallLocationName());
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Top-K
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
new file mode 100644
index 0000000..7c09518
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.SingleInputOperator;
+
+import java.util.Arrays;
+
+/**
+ * This operator represents a DataSet with locally sorted partitions.
+ *
+ * @param <T> The type of the DataSet with locally sorted partitions.
+ */
+public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPartitionOperator<T>>
{
+
+	private int[] sortKeyPositions;
+
+	private Order[] sortOrders;
+
+	private final String sortLocationName;
+
+
+	public SortPartitionOperator(DataSet<T> dataSet, int sortField, Order sortOrder, String
sortLocationName) {
+		super(dataSet, dataSet.getType());
+		this.sortLocationName = sortLocationName;
+
+		int[] flatOrderKeys = getFlatFields(sortField);
+		this.appendSorting(flatOrderKeys, sortOrder);
+	}
+
+	public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrder,
String sortLocationName) {
+		super(dataSet, dataSet.getType());
+		this.sortLocationName = sortLocationName;
+
+		int[] flatOrderKeys = getFlatFields(sortField);
+		this.appendSorting(flatOrderKeys, sortOrder);
+	}
+
+	/**
+	 * Appends an additional sort order with the specified field in the specified order to the
+	 * local partition sorting of the DataSet.
+	 *
+	 * @param field The field index of the additional sort order of the local partition sorting.
+	 * @param order The order of the additional sort order of the local partition sorting.
+	 * @return The DataSet with sorted local partitions.
+	 */
+	public SortPartitionOperator<T> sortPartition(int field, Order order) {
+
+		int[] flatOrderKeys = getFlatFields(field);
+		this.appendSorting(flatOrderKeys, order);
+		return this;
+	}
+
+	/**
+	 * Appends an additional sort order with the specified field in the specified order to the
+	 * local partition sorting of the DataSet.
+	 *
+	 * @param field The field expression referring to the field of the additional sort order
of
+	 *                 the local partition sorting.
+	 * @param order The order  of the additional sort order of the local partition sorting.
+	 * @return The DataSet with sorted local partitions.
+	 */
+	public SortPartitionOperator<T> sortPartition(String field, Order order) {
+		int[] flatOrderKeys = getFlatFields(field);
+		this.appendSorting(flatOrderKeys, order);
+		return this;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Key Extraction
+	// --------------------------------------------------------------------------------------------
+
+	private int[] getFlatFields(int field) {
+
+		Keys.ExpressionKeys<T> ek;
+		try {
+			ek = new Keys.ExpressionKeys<T>(new int[]{field}, super.getType());
+		} catch(IllegalArgumentException iae) {
+			throw new InvalidProgramException("Invalid specification of field expression.", iae);
+		}
+		return ek.computeLogicalKeyPositions();
+	}
+
+	private int[] getFlatFields(String fields) {
+
+		if(super.getType() instanceof CompositeType) {
+			// compute flat field positions for (nested) sorting fields
+			Keys.ExpressionKeys<T> ek;
+			try {
+				ek = new Keys.ExpressionKeys<T>(new String[]{fields}, super.getType());
+			} catch(IllegalArgumentException iae) {
+				throw new InvalidProgramException("Invalid specification of field expression.", iae);
+			}
+			return ek.computeLogicalKeyPositions();
+		} else {
+
+			fields = fields.trim();
+			if (!(fields.equals("*") || fields.equals("_"))) {
+				throw new InvalidProgramException("Output sorting of non-composite types can only be
defined on the full type. " +
+						"Use a field wildcard for that (\"*\" or \"_\")");
+			} else {
+				return new int[]{0};
+			}
+		}
+	}
+
+	private void appendSorting(int[] flatOrderFields, Order order) {
+
+		if(this.sortKeyPositions == null) {
+			// set sorting info
+			this.sortKeyPositions = flatOrderFields;
+			this.sortOrders = new Order[flatOrderFields.length];
+			Arrays.fill(this.sortOrders, order);
+		} else {
+			// append sorting info to exising info
+			int oldLength = this.sortKeyPositions.length;
+			int newLength = oldLength + flatOrderFields.length;
+			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
+			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
+
+			for(int i=0; i<flatOrderFields.length; i++) {
+				this.sortKeyPositions[oldLength+i] = flatOrderFields[i];
+				this.sortOrders[oldLength+i] = order;
+			}
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Translation
+	// --------------------------------------------------------------------------------------------
+
+	protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T>
input) {
+
+		String name = "Sort at " + sortLocationName;
+
+		Ordering partitionOrdering = new Ordering();
+		for (int i = 0; i < this.sortKeyPositions.length; i++) {
+			partitionOrdering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
+		}
+
+		// distinguish between partition types
+		UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T,
T>(getType(), getType());
+		SortPartitionOperatorBase<T> noop = new  SortPartitionOperatorBase<T>(operatorInfo,
partitionOrdering, name);
+		noop.setInput(input);
+		if(this.getParallelism() < 0) {
+			// use parallelism of input if not explicitly specified
+			noop.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		} else {
+			// use explicitly specified parallelism
+			noop.setDegreeOfParallelism(this.getParallelism());
+		}
+
+		return noop;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index c1796ee..8a1dc41 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
+import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
@@ -30,7 +31,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.java.{DataSet => JavaDataSet, SortPartitionOperator}
 import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.{FileSystem, Path}
@@ -1135,6 +1136,26 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   // --------------------------------------------------------------------------------------------
+  //  Partition Sorting
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Locally sorts the partitions of the DataSet on the specified field in the specified
order.
+   * The DataSet can be sorted on multiple fields by chaining sortPartition() calls.
+   */
+  def sortPartition(field: Int, order: Order): DataSet[T] = {
+    wrap (new SortPartitionOperator[T](javaSet, field, order, getCallLocationName()))
+  }
+
+  /**
+   * Locally sorts the partitions of the DataSet on the specified field in the specified
order.
+   * The DataSet can be sorted on multiple fields by chaining sortPartition() calls.
+   */
+  def sortPartition(field: String, order: Order): DataSet[T] = {
+    wrap (new SortPartitionOperator[T](javaSet, field, order, getCallLocationName()))
+  }
+
+  // --------------------------------------------------------------------------------------------
   //  Result writing
   // --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
new file mode 100644
index 0000000..790b7ba
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+@RunWith(Parameterized.class)
+public class SortPartitionITCase extends MultipleProgramsTestBase {
+
+	public SortPartitionITCase(ExecutionMode mode){
+		super(mode);
+	}
+
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testSortPartitionByKeyField() throws Exception {
+		/*
+		 * Test sort partition on key field
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(4);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		ds
+				.map(new IdMapper()).setParallelism(4) // parallelize input
+				.sortPartition(1, Order.DESCENDING)
+				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	@Test
+	public void testSortPartitionByTwoKeyFields() throws Exception {
+		/*
+		 * Test sort partition on two key fields
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(2);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		ds
+				.map(new IdMapper()).setParallelism(2) // parallelize input
+				.sortPartition(4, Order.ASCENDING)
+				.sortPartition(2, Order.DESCENDING)
+				.mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new
Tuple5Checker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	@Test
+	public void testSortPartitionByFieldExpression() throws Exception {
+		/*
+		 * Test sort partition on field expression
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(4);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		ds
+				.map(new IdMapper()).setParallelism(4) // parallelize input
+				.sortPartition("f1", Order.DESCENDING)
+				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	@Test
+	public void testSortPartitionByTwoFieldExpressions() throws Exception {
+		/*
+		 * Test sort partition on two field expressions
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(2);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		ds
+				.map(new IdMapper()).setParallelism(2) // parallelize input
+				.sortPartition("f4", Order.ASCENDING)
+				.sortPartition("f2", Order.DESCENDING)
+				.mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new
Tuple5Checker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	@Test
+	public void testSortPartitionByNestedFieldExpression() throws Exception {
+		/*
+		 * Test sort partition on nested field expressions
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(3);
+
+		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+		ds
+				.map(new IdMapper()).setParallelism(3) // parallelize input
+				.sortPartition("f0.f1", Order.ASCENDING)
+				.sortPartition("f1", Order.DESCENDING)
+				.mapPartition(new OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new
NestedTupleChecker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	@Test
+	public void testSortPartitionPojoByNestedFieldExpression() throws Exception {
+		/*
+		 * Test sort partition on field expression
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(3);
+
+		DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
+		ds
+				.map(new IdMapper()).setParallelism(1) // parallelize input
+				.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
+				.sortPartition("number", Order.DESCENDING)
+				.mapPartition(new OrderCheckMapper<POJO>(new PojoChecker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	@Test
+	public void testSortPartitionDOPChange() throws Exception {
+		/*
+		 * Test sort partition with DOP change
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(3);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		ds
+				.sortPartition(1, Order.DESCENDING).setParallelism(3) // change DOP
+				.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
+				.distinct()
+				.writeAsText(resultPath);
+
+		env.execute();
+
+		expected = "(true)\n";
+	}
+
+	public static interface OrderChecker<T> extends Serializable {
+
+		public boolean inOrder(T t1, T t2);
+	}
+
+	public static class Tuple3Checker implements OrderChecker<Tuple3<Integer, Long, String>>
{
+		@Override
+		public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long,
String> t2) {
+			return t1.f1 >= t2.f1;
+		}
+	}
+
+	public static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer,
String, Long>> {
+		@Override
+		public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1,
+								Tuple5<Integer, Long, Integer, String, Long> t2) {
+			return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= t2.f2;
+		}
+	}
+
+	public static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer,
Integer>, String>> {
+		@Override
+		public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1,
+								Tuple2<Tuple2<Integer, Integer>, String> t2) {
+			return t1.f0.f1 < t2.f0.f1 ||
+					t1.f0.f1 == t2.f0.f1 && t1.f1.compareTo(t2.f1) >= 0;
+ 		}
+	}
+
+	public static class PojoChecker implements OrderChecker<POJO> {
+		@Override
+		public boolean inOrder(POJO t1,
+							   POJO t2) {
+			return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
< 0 ||
+					t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString)
== 0 &&
+							t1.number >= t2.number;
+		}
+	}
+
+	public static class OrderCheckMapper<T> implements MapPartitionFunction<T, Tuple1<Boolean>>
{
+
+		OrderChecker<T> checker;
+
+		public OrderCheckMapper() {}
+
+		public OrderCheckMapper(OrderChecker<T> checker) {
+			this.checker = checker;
+		}
+
+		@Override
+		public void mapPartition(Iterable<T> values, Collector<Tuple1<Boolean>>
out) throws Exception {
+
+			Iterator<T> it = values.iterator();
+			if(!it.hasNext()) {
+				out.collect(new Tuple1<Boolean>(true));
+				return;
+			} else {
+				T last = it.next();
+
+				while (it.hasNext()) {
+					T next = it.next();
+					if (!checker.inOrder(last, next)) {
+						out.collect(new Tuple1<Boolean>(false));
+						return;
+					}
+					last = next;
+				}
+				out.collect(new Tuple1<Boolean>(true));
+			}
+		}
+	}
+
+
+	public static class IdMapper<T> implements MapFunction<T, T> {
+
+		@Override
+		public T map(T value) throws Exception {
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d849703/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index ef6b8a9..132d82f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -301,8 +301,6 @@ public class CollectionDataSets {
 		public CustomType() {
 		}
 
-		;
-
 		public CustomType(int i, long l, String s) {
 			myInt = i;
 			myLong = l;


Mime
View raw message