flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-1105] [api-extending] Add support for local sorting on data sinks.
Date Tue, 03 Feb 2015 10:12:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3d9267eb3 -> 94c3e9c08


[FLINK-1105] [api-extending] Add support for local sorting on data sinks.

This closes #347


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f475599
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f475599
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f475599

Branch: refs/heads/master
Commit: 5f475599264d3d388e81cbceb71b641e1bb7d83f
Parents: 3d9267e
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Jan 26 18:44:11 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Feb 3 10:51:30 2015 +0100

----------------------------------------------------------------------
 .../common/operators/GenericDataSinkBase.java   |  30 +-
 .../flink/api/java/operators/DataSink.java      | 120 +++++++
 .../flink/api/java/operator/DataSinkTest.java   | 346 +++++++++++++++++++
 .../test/javaApiOperators/DataSinkITCase.java   | 341 ++++++++++++++++++
 .../util/CollectionDataSets.java                |  13 +
 5 files changed, 849 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f475599/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index a0f367e..242e83d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.api.common.operators;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.flink.api.common.distributions.DataDistribution;
@@ -27,6 +29,10 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.util.Visitor;
 
@@ -294,7 +300,29 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing>
{
 	
 	protected void executeOnCollections(List<IN> inputData) throws Exception {
 		OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
-		
+		TypeInformation<IN> inputType = getInput().getOperatorInfo().getOutputType();
+
+		if (this.localOrdering != null) {
+			int[] sortColumns = this.localOrdering.getFieldPositions();
+			boolean[] sortOrderings = this.localOrdering.getFieldSortDirections();
+
+			final TypeComparator<IN> sortComparator;
+			if (inputType instanceof CompositeType) {
+				sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns,
sortOrderings, 0);
+			} else if (inputType instanceof AtomicType) {
+				sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0]);
+			} else {
+				throw new UnsupportedOperationException("Local output sorting does not support type "+inputType+"
yet.");
+			}
+
+			Collections.sort(inputData, new Comparator<IN>() {
+				@Override
+				public int compare(IN o1, IN o2) {
+					return sortComparator.compare(o1, o2);
+				}
+			});
+		}
+
 		if(format instanceof InitializeOnMaster) {
 			((InitializeOnMaster)format).initializeGlobal(1);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f475599/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index ee0129c..9da5433 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -18,16 +18,22 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.api.java.DataSet;
 
+import java.util.Arrays;
+
 
 public class DataSink<T> {
 	
@@ -43,6 +49,10 @@ public class DataSink<T> {
 
 	private Configuration parameters;
 
+	private int[] sortKeyPositions;
+
+	private Order[] sortOrders;
+
 	public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T>
type) {
 		if (format == null) {
 			throw new IllegalArgumentException("The output format must not be null.");
@@ -83,6 +93,107 @@ public class DataSink<T> {
 	}
 
 	/**
+	 * Sorts each local partition of a {@link org.apache.flink.api.java.tuple.Tuple} data set
+	 * on the specified field in the specified {@link Order} before it is emitted by the output
format.</br>
+	 * <b>Note: Only tuple data sets can be sorted using integer field indices.</b><br/>
+	 * The tuple data set can be sorted on multiple fields in different orders
+	 * by chaining {@link #sortLocalOutput(int, Order)} calls.
+	 *
+	 * @param field The Tuple field on which the data set is locally sorted.
+	 * @param order The Order in which the specified Tuple field is locally sorted.
+	 * @return This data sink operator with specified output order.
+	 *
+	 * @see org.apache.flink.api.java.tuple.Tuple
+	 * @see Order
+	 */
+	public DataSink<T> sortLocalOutput(int field, Order order) {
+
+		if (!this.type.isTupleType()) {
+			throw new InvalidProgramException("Specifying order keys via field positions is only valid
for tuple data types");
+		}
+		if (field >= this.type.getArity()) {
+			throw new InvalidProgramException("Order key out of tuple bounds.");
+		}
+
+		if(this.sortKeyPositions == null) {
+			// set sorting info
+			this.sortKeyPositions = new int[] {field};
+			this.sortOrders = new Order[] {order};
+		} else {
+			// append sorting info to exising info
+			int newLength = this.sortKeyPositions.length + 1;
+			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
+			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
+			this.sortKeyPositions[newLength-1] = field;
+			this.sortOrders[newLength-1] = order;
+		}
+		return this;
+	}
+
+	/**
+	 * Sorts each local partition of a data set on the field(s) specified by the field expression
+	 * in the specified {@link Order} before it is emitted by the output format.</br>
+	 * <b>Note: Non-composite types can only be sorted on the full element which is specified
by
+	 * a wildcard expression ("*" or "_").</b><br/>
+	 * Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different
orders
+	 * by chaining {@link #sortLocalOutput(String, Order)} calls.
+	 *
+	 * @param fieldExpression The field expression for the field(s) on which the data set is
locally sorted.
+	 * @param order The Order in which the specified field(s) are locally sorted.
+	 * @return This data sink operator with specified output order.
+	 *
+	 * @see Order
+	 */
+	public DataSink<T> sortLocalOutput(String fieldExpression, Order order) {
+
+		int numFields;
+		int[] fields;
+		Order[] orders;
+
+		if(this.type instanceof CompositeType) {
+			// compute flat field positions for (nested) sorting fields
+			Keys.ExpressionKeys<T> ek;
+			try {
+				ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type);
+			} catch(IllegalArgumentException iae) {
+				throw new InvalidProgramException("Invalid specification of field expression.", iae);
+			}
+			fields = ek.computeLogicalKeyPositions();
+			numFields = fields.length;
+			orders = new Order[numFields];
+			Arrays.fill(orders, order);
+		} else {
+			fieldExpression = fieldExpression.trim();
+			if (!(fieldExpression.equals("*") || fieldExpression.equals("_"))) {
+				throw new InvalidProgramException("Output sorting of non-composite types can only be
defined on the full type. " +
+						"Use a field wildcard for that (\"*\" or \"_\")");
+			} else {
+				numFields = 1;
+				fields = new int[]{0};
+				orders = new Order[]{order};
+			}
+		}
+
+		if(this.sortKeyPositions == null) {
+			// set sorting info
+			this.sortKeyPositions = fields;
+			this.sortOrders = orders;
+		} else {
+			// append sorting info to existing info
+			int oldLength = this.sortKeyPositions.length;
+			int newLength = oldLength + numFields;
+			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
+			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
+			for(int i=0; i<numFields; i++) {
+				this.sortKeyPositions[oldLength+i] = fields[i];
+				this.sortOrders[oldLength+i] = orders[i];
+			}
+		}
+
+		return this;
+	}
+
+	/**
 	 * @return Configuration for the OutputFormat.
 	 */
 	public Configuration getParameters() {
@@ -116,6 +227,15 @@ public class DataSink<T> {
 			// if no dop has been specified, use dop of input operator to enable chaining
 			sink.setDegreeOfParallelism(input.getDegreeOfParallelism());
 		}
+
+		if(this.sortKeyPositions != null) {
+			// configure output sorting
+			Ordering ordering = new Ordering();
+			for(int i=0; i<this.sortKeyPositions.length; i++) {
+				ordering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
+			}
+			sink.setLocalOrder(ordering);
+		}
 		
 		return sink;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f475599/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
new file mode 100644
index 0000000..7a7ed14
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataSinkTest {
+
+	// TUPLE DATA
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData
= new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo
= new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+			BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+	// POJO DATA
+	private final List<CustomType> pojoData = new ArrayList<CustomType>();
+
+	@Before
+	public void fillPojoData() {
+		if(pojoData.isEmpty()) {
+			pojoData.add(new CustomType());
+		}
+	}
+
+	@Test
+	public void testTupleSingleOrderIdx() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput(0, Order.ANY);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testTupleTwoOrderIdx() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput(0, Order.ASCENDING)
+					.sortLocalOutput(3, Order.DESCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testTupleSingleOrderExp() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput("f0", Order.ANY);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testTupleSingleOrderExpFull() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput("*", Order.ANY);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testTupleTwoOrderExp() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput("f1", Order.ASCENDING)
+					.sortLocalOutput("f4", Order.DESCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testTupleTwoOrderMixed() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput(4, Order.ASCENDING)
+					.sortLocalOutput("f2", Order.DESCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailTupleIndexOutOfBounds() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// must not work
+		tupleDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput(3, Order.ASCENDING)
+				.sortLocalOutput(5, Order.DESCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailTupleInv() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// must not work
+		tupleDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("notThere", Order.ASCENDING)
+				.sortLocalOutput("f4", Order.DESCENDING);
+	}
+
+	@Test
+	public void testPrimitiveOrder() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Long> longDs = env
+				.generateSequence(0,2);
+
+		// should work
+		try {
+			longDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput("*", Order.ASCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailPrimitiveOrder1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Long> longDs = env
+				.generateSequence(0,2);
+
+		// must not work
+		longDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput(0, Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailPrimitiveOrder2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Long> longDs = env
+				.generateSequence(0,2);
+
+		// must not work
+		longDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("0", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailPrimitiveOrder3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Long> longDs = env
+				.generateSequence(0,2);
+
+		// must not work
+		longDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("nope", Order.ASCENDING);
+	}
+
+	@Test
+	public void testPojoSingleOrder() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// should work
+		try {
+			pojoDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput("myString", Order.ASCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testPojoSingleOrderFull() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// should work
+		try {
+			pojoDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput("*", Order.ASCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testPojoTwoOrder() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// should work
+		try {
+			pojoDs.writeAsText("/tmp/willNotHappen")
+					.sortLocalOutput("myLong", Order.ASCENDING)
+					.sortLocalOutput("myString", Order.DESCENDING);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailPojoIdx() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// must not work
+		pojoDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput(1, Order.DESCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testFailPojoInvalidField() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// must not work
+		pojoDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("myInt", Order.ASCENDING)
+				.sortLocalOutput("notThere", Order.DESCENDING);
+	}
+
+	/**
+	 * Custom data type, for testing purposes.
+	 */
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int myInt;
+		public long myLong;
+		public String myString;
+
+		public CustomType() {
+		};
+
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+
+		@Override
+		public String toString() {
+			return myInt + "," + myLong + "," + myString;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f475599/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
new file mode 100644
index 0000000..ca9dc16
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for data sinks
+ */
+
+@RunWith(Parameterized.class)
+public class DataSinkITCase extends MultipleProgramsTestBase {
+
+	public DataSinkITCase(ExecutionMode mode) {
+		super(mode);
+	}
+
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@Test
+	public void testIntSortingDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
+		ds.writeAsText(resultPath).sortLocalOutput("*", Order.DESCENDING).setParallelism(1);
+
+		env.execute();
+
+		expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+
+	}
+
+	@Test
+	public void testStringSortingDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		ds.writeAsText(resultPath).sortLocalOutput("*", Order.ASCENDING).setParallelism(1);
+
+		env.execute();
+
+		expected = "Hello\n" +
+				"Hello world\n" +
+				"Hello world, how are you?\n" +
+				"Hi\n" +
+				"I am fine.\n" +
+				"LOL\n" +
+				"Luke Skywalker\n" +
+				"Random comment\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+	}
+
+	@Test
+	public void testTupleSortingSingleAscDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		ds.writeAsCsv(resultPath).sortLocalOutput(0, Order.ASCENDING).setParallelism(1);
+
+		env.execute();
+
+		expected = "1,1,Hi\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n" +
+				"6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" +
+				"8,4,Comment#2\n" +
+				"9,4,Comment#3\n" +
+				"10,4,Comment#4\n" +
+				"11,5,Comment#5\n" +
+				"12,5,Comment#6\n" +
+				"13,5,Comment#7\n" +
+				"14,5,Comment#8\n" +
+				"15,5,Comment#9\n" +
+				"16,6,Comment#10\n" +
+				"17,6,Comment#11\n" +
+				"18,6,Comment#12\n" +
+				"19,6,Comment#13\n" +
+				"20,6,Comment#14\n" +
+				"21,6,Comment#15\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+
+	}
+
+	@Test
+	public void testTupleSortingSingleDescDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		ds.writeAsCsv(resultPath).sortLocalOutput(0, Order.DESCENDING).setParallelism(1);
+
+		env.execute();
+
+		expected = "21,6,Comment#15\n" +
+				"20,6,Comment#14\n" +
+				"19,6,Comment#13\n" +
+				"18,6,Comment#12\n" +
+				"17,6,Comment#11\n" +
+				"16,6,Comment#10\n" +
+				"15,5,Comment#9\n" +
+				"14,5,Comment#8\n" +
+				"13,5,Comment#7\n" +
+				"12,5,Comment#6\n" +
+				"11,5,Comment#5\n" +
+				"10,4,Comment#4\n" +
+				"9,4,Comment#3\n" +
+				"8,4,Comment#2\n" +
+				"7,4,Comment#1\n" +
+				"6,3,Luke Skywalker\n" +
+				"5,3,I am fine.\n" +
+				"4,3,Hello world, how are you?\n" +
+				"3,2,Hello world\n" +
+				"2,2,Hello\n" +
+				"1,1,Hi\n";
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+
+	}
+
+	@Test
+	public void testTupleSortingDualDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		ds.writeAsCsv(resultPath).sortLocalOutput(1, Order.DESCENDING).sortLocalOutput(0, Order.ASCENDING).setParallelism(1);
+
+		env.execute();
+
+		expected = "16,6,Comment#10\n" +
+				"17,6,Comment#11\n" +
+				"18,6,Comment#12\n" +
+				"19,6,Comment#13\n" +
+				"20,6,Comment#14\n" +
+				"21,6,Comment#15\n" +
+				"11,5,Comment#5\n" +
+				"12,5,Comment#6\n" +
+				"13,5,Comment#7\n" +
+				"14,5,Comment#8\n" +
+				"15,5,Comment#9\n" +
+				"7,4,Comment#1\n" +
+				"8,4,Comment#2\n" +
+				"9,4,Comment#3\n" +
+				"10,4,Comment#4\n" +
+				"4,3,Hello world, how are you?\n" +
+				"5,3,I am fine.\n" +
+				"6,3,Luke Skywalker\n" +
+				"2,2,Hello\n" +
+				"3,2,Hello world\n" +
+				"1,1,Hi\n";
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+
+	}
+
+	@Test
+	public void testTupleSortingNestedDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
+				CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+		ds.writeAsText(resultPath)
+				.sortLocalOutput("f0.f1", Order.ASCENDING)
+				.sortLocalOutput("f1", Order.DESCENDING)
+				.setParallelism(1);
+
+		env.execute();
+
+		expected =
+				"((2,1),a)\n" +
+				"((2,2),b)\n" +
+				"((1,2),a)\n" +
+				"((3,3),c)\n" +
+				"((1,3),a)\n" +
+				"((3,6),c)\n" +
+				"((4,9),c)\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+	}
+
+	@Test
+	public void testPojoSortingSingleDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
+		ds.writeAsText(resultPath).sortLocalOutput("number", Order.ASCENDING).setParallelism(1);
+
+		env.execute();
+
+		expected = "1 First (10,100,1000,One) 10100\n" +
+				"2 First_ (10,105,1000,One) 10200\n" +
+				"3 First (11,102,3000,One) 10200\n" +
+				"4 First_ (11,106,1000,One) 10300\n" +
+				"5 First (11,102,2000,One) 10100\n" +
+				"6 Second_ (20,200,2000,Two) 10100\n" +
+				"7 Third (31,301,2000,Three) 10200\n" +
+				"8 Third_ (30,300,1000,Three) 10100\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+
+	}
+
+	@Test
+	public void testPojoSortingDualDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
+		ds.writeAsText(resultPath)
+				.sortLocalOutput("str", Order.ASCENDING)
+				.sortLocalOutput("number", Order.DESCENDING)
+				.setParallelism(1);
+
+		env.execute();
+
+		expected =
+				"5 First (11,102,2000,One) 10100\n" +
+				"3 First (11,102,3000,One) 10200\n" +
+				"1 First (10,100,1000,One) 10100\n" +
+				"4 First_ (11,106,1000,One) 10300\n" +
+				"2 First_ (10,105,1000,One) 10200\n" +
+				"6 Second_ (20,200,2000,Two) 10100\n" +
+				"7 Third (31,301,2000,Three) 10200\n" +
+				"8 Third_ (30,300,1000,Three) 10100\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+
+	}
+
+	@Test
+	public void testPojoSortingNestedDOP1() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
+		ds.writeAsText(resultPath)
+				.sortLocalOutput("nestedTupleWithCustom.f0", Order.ASCENDING)
+				.sortLocalOutput("nestedTupleWithCustom.f1.myInt", Order.DESCENDING)
+				.sortLocalOutput("nestedPojo.longNumber", Order.ASCENDING)
+				.setParallelism(1);
+
+		env.execute();
+
+		expected =
+				"2 First_ (10,105,1000,One) 10200\n" +
+				"1 First (10,100,1000,One) 10100\n" +
+				"4 First_ (11,106,1000,One) 10300\n" +
+				"5 First (11,102,2000,One) 10100\n" +
+				"3 First (11,102,3000,One) 10200\n" +
+				"6 Second_ (20,200,2000,Two) 10100\n" +
+				"8 Third_ (30,300,1000,Three) 10100\n" +
+				"7 Third (31,301,2000,Three) 10200\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+	}
+
+	@Test
+	public void testSortingDOP4() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Long> ds = env.generateSequence(0, 1000);
+		// randomize
+		ds.map(new MapFunction<Long, Long>() {
+
+			Random rand = new Random(1234l);
+			@Override
+			public Long map(Long value) throws Exception {
+				return rand.nextLong();
+			}
+		}).writeAsText(resultPath)
+				.sortLocalOutput("*", Order.ASCENDING)
+				.setParallelism(4);
+
+		env.execute();
+
+		BufferedReader[] resReaders = getResultReader(resultPath);
+		for (BufferedReader br : resReaders) {
+			long cmp = Long.MIN_VALUE;
+			while(br.ready()) {
+				long cur = Long.parseLong(br.readLine());
+				assertTrue("Invalid order of sorted output", cmp <= cur);
+				cmp = cur;
+			}
+			br.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f475599/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 54f088a..632e7c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -338,6 +338,19 @@ public class CollectionDataSets {
 		return env.fromCollection(data);
 	}
 
+	public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
+		data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
+		data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
+		data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
+		data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
+		data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
+		data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
+		data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
+		return env.fromCollection(data);
+	}
+
 	public static class POJO {
 		public int number;
 		public String str;


Mime
View raw message