flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/14] flink git commit: [FLINK-3444] [APIs] Add fromElements method with based class type to avoid the exception.
Date Wed, 13 Apr 2016 08:30:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master f315c5700 -> db85f3858


[FLINK-3444] [APIs] Add fromElements method with based class type to avoid the exception.

This closes #1857


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb085ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb085ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb085ec

Branch: refs/heads/master
Commit: 6bb085ec6d70e196b7b61bec5f6dc3f924ca7906
Parents: 693d5ab
Author: gallenvara <gallenvara@126.com>
Authored: Wed Apr 6 16:04:32 2016 +0800
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Apr 13 01:10:54 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 45 ++++++++++++++++-
 .../flink/api/java/io/FromElementsTest.java     | 51 ++++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java | 33 +++++++++++++
 .../api/StreamExecutionEnvironmentTest.java     | 27 +++++++++++
 4 files changed, 155 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1363e26..89c817d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -777,7 +777,50 @@ public abstract class ExecutionEnvironment {
 			throw new IllegalArgumentException("The number of elements must not be zero.");
 		}
 		
-		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
+		TypeInformation<X> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForObject(data[0]);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+					+ "; please specify the TypeInformation manually via "
+					+ "ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+
+		return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
+	}
+	
+	/**
+	 * Creates a new data set that contains the given elements. The framework will determine
the type according to the 
+	 * based type user supplied. The elements should be the same or be the subclass to the based
type. 
+	 * The sequence of elements must not be empty.
+	 * Note that this operation will result in a non-parallel data source, i.e. a data source
with
+	 * a parallelism of one.
+	 *
+	 * @param type The base class type for every element in the collection.
+	 * @param data The elements to make up the data set.
+	 * @return A DataSet representing the given list of elements.
+	 */
+	@SafeVarargs
+	public final <X> DataSource<X> fromElements(Class<X> type, X... data)
{
+		if (data == null) {
+			throw new IllegalArgumentException("The data must not be null.");
+		}
+		if (data.length == 0) {
+			throw new IllegalArgumentException("The number of elements must not be zero.");
+		}
+		
+		TypeInformation<X> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForClass(type);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+					+ "; please specify the TypeInformation manually via "
+					+ "ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+
+		return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
new file mode 100644
index 0000000..2f403aa
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Test;
+
+public class FromElementsTest {
+
+	@Test
+	public void fromElementsWithBaseTypeTest1() {
+		ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
+		executionEnvironment.fromElements(ParentType.class, new SubType(1, "Java"), new ParentType(1,
"hello"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void fromElementsWithBaseTypeTest2() {
+		ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
+		executionEnvironment.fromElements(SubType.class, new SubType(1, "Java"), new ParentType(1,
"hello"));
+	}
+
+	public static class ParentType {
+		int num;
+		String string;
+		public ParentType(int num, String string) {
+			this.num = num;
+			this.string = string;
+		}
+	}
+
+	public static class SubType extends ParentType{
+		public SubType(int num, String string) {
+			super(num, string);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index fb7ec9f..ae4758f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -673,6 +673,39 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a new data set that contains the given elements. The framework will determine
the type according to the 
+	 * based type user supplied. The elements should be the same or be the subclass to the based
type. 
+	 * The sequence of elements must not be empty.
+	 * Note that this operation will result in a non-parallel data stream source, i.e. a data
stream source with a
+	 * degree of parallelism one.
+	 *
+	 * @param type
+	 * 		The based class type in the collection.
+	 * @param data
+	 * 		The array of elements to create the data stream from.
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream representing the given array of elements
+	 */
+	@SafeVarargs
+	public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type,
OUT... data) {
+		if (data.length == 0) {
+			throw new IllegalArgumentException("fromElements needs at least one element as argument");
+		}
+
+		TypeInformation<OUT> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForClass(type);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+					+ "; please specify the TypeInformation manually via "
+					+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+		return fromCollection(Arrays.asList(data), typeInfo);
+	}
+
+	/**
 	 * Creates a data stream from the given non-empty collection. The type of the data stream
is that of the
 	 * elements in the collection.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 67a4b05..5e596b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -45,6 +45,18 @@ import org.junit.Test;
 public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase {
 
 	@Test
+	public void fromElementsWithBaseTypeTest1() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void fromElementsWithBaseTypeTest2() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testFromCollectionParallelism() {
 		try {
@@ -159,4 +171,19 @@ public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTes
 			throw new UnsupportedOperationException();
 		}
 	}
+
+	public static class ParentClass {
+		int num;
+		String string;
+		public ParentClass(int num, String string) {
+			this.num = num;
+			this.string = string;
+		}
+	}
+
+	public static class SubClass extends ParentClass{
+		public SubClass(int num, String string) {
+			super(num, string);
+		}
+	}
 }


Mime
View raw message