flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/9] flink git commit: [FLINK-2124] [streaming] Handle 'fromElements()' for non-serializable elements. Cleanup code, and add proper tests.
Date Wed, 08 Jul 2015 18:31:08 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8acc0d2f1 -> e451a4ae0


[FLINK-2124] [streaming] Handle 'fromElements()' for non-serializable elements. Cleanup code,
and add proper tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28713a29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28713a29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28713a29

Branch: refs/heads/master
Commit: 28713a2962d97bf8809ad23d96d395d998a98be8
Parents: e4a030c
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jul 8 17:56:04 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 146 +++++++-------
 .../functions/source/FromElementsFunction.java  | 111 +++++++----
 .../api/functions/source/SourceFunction.java    |   9 +-
 .../api/StreamExecutionEnvironmentTest.java     |  59 +++---
 .../api/functions/FromElementsFunctionTest.java | 191 +++++++++++++++++++
 .../api/functions/ListSourceContext.java        |  50 +++++
 .../api/scala/StreamExecutionEnvironment.scala  |  38 ++--
 7 files changed, 442 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f98a9f0..58348e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -65,6 +65,8 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -432,7 +434,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Creates a new data stream that contains a sequence of numbers. This is a parallel source,
 	 * if you manually set the parallelism to {@code 1}
-	 * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()})
+	 * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setParallelism(int)})
 	 * the generated sequence of elements is in order.
 	 *
 	 * @param from
@@ -467,35 +469,38 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
 		if (data.length == 0) {
-			throw new IllegalArgumentException(
-					"fromElements needs at least one element as argument");
+			throw new IllegalArgumentException("fromElements needs at least one element as argument");
 		}
 
-		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
-
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()),
data);
-
-		return addSource(function, "Elements source").returns(typeInfo);
+		TypeInformation<OUT> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForObject(data[0]);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+				+ "; please specify the TypeInformation manually via "
+				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+		return fromCollection(Arrays.asList(data), typeInfo);
 	}
 
 	/**
 	 * Creates a data stream from the given non-empty collection. The type of the data stream
is that of the
 	 * elements in the collection.
 	 *
-	 * <p>
-	 * The framework will try and determine the exact type from the collection elements. In
case of generic
+	 * <p>The framework will try and determine the exact type from the collection elements.
In case of generic
 	 * elements, it may be necessary to manually supply the type information via
-	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
-	 * <p>
+	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
 	 *
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data
stream source with a
-	 * degree of parallelism one.
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.
a data stream source with a
+	 * parallelism one.</p>
 	 *
 	 * @param data
-	 * 		The collection of elements to create the data stream from
+	 * 		The collection of elements to create the data stream from.
 	 * @param <OUT>
-	 * 		The type of the returned data stream
-	 * @return The data stream representing the given collection
+	 *     The generic type of the returned data stream.
+	 * @return
+	 *     The data stream representing the given collection
 	 */
 	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data)
{
 		Preconditions.checkNotNull(data, "Collection must not be null");
@@ -503,16 +508,28 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()),
data);
-		checkCollection(data, typeInfo.getTypeClass());
+		OUT first = data.iterator().next();
+		if (first == null) {
+			throw new IllegalArgumentException("Collection must not contain null elements");
+		}
 
-		return addSource(function, "Collection Source").returns(typeInfo);
+		TypeInformation<OUT> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForObject(first);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
+					+ "; please specify the TypeInformation manually via "
+					+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+		return fromCollection(data, typeInfo);
 	}
 
 	/**
-	 * Creates a data stream from the given non-empty collection.Note that this operation will
result in
-	 * a non-parallel data stream source, i.e. a data stream source with a degree of parallelism
one.
+	 * Creates a data stream from the given non-empty collection.
+	 * 
+	 * <p>Note that this operation will result in a non-parallel data stream source,
+	 * i.e., a data stream source with a parallelism one.</p>
 	 *
 	 * @param data
 	 * 		The collection of elements to create the data stream from
@@ -522,26 +539,31 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given collection
 	 */
-	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data,
TypeInformation<OUT>
-			typeInfo) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data,
TypeInformation<OUT> typeInfo) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
-		if (data.isEmpty()) {
-			throw new IllegalArgumentException("Collection must not be empty");
+		
+		// must not have null elements and mixed elements
+		FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
+		
+		SourceFunction<OUT> function;
+		try {
+			function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()),
data);
 		}
-
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()),
data);
-		checkCollection(data, typeInfo.getTypeClass());
-
-		return addSource(function, "Collection Source").returns(typeInfo);
+		catch (IOException e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+		return addSource(function, "Collection Source", typeInfo);
 	}
 
 	/**
-	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified
until the actual
-	 * execution happens, the type of data returned by the iterator must be given explicitly
in the form of the type
-	 * class (this is due to the fact that the Java compiler erases the generic type information).
-	 * <p>
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data
stream source with a
-	 * degree of parallelism of one.
+	 * Creates a data stream from the given iterator.
+	 * 
+	 * <p>Because the iterator will remain unmodified until the actual execution happens,
+	 * the type of data returned by the iterator must be given explicitly in the form of the
type
+	 * class (this is due to the fact that the Java compiler erases the generic type information).</p>
+	 * 
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.,
+	 * a data stream source with a parallelism of one.</p>
 	 *
 	 * @param data
 	 * 		The iterator of elements to create the data stream from
@@ -557,14 +579,16 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified
until the actual
-	 * execution happens, the type of data returned by the iterator must be given explicitly
in the form of the type
-	 * information. This method is useful for cases where the type is generic. In that case,
the type class (as
-	 * given in
-	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data
stream source with a
-	 * degree of parallelism one.
+	 * Creates a data stream from the given iterator.
+	 * 
+	 * <p>Because the iterator will remain unmodified until the actual execution happens,
+	 * the type of data returned by the iterator must be given explicitly in the form of the
type
+	 * information. This method is useful for cases where the type is generic.
+	 * In that case, the type class (as given in
+	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.</p>
+	 * 
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.,
+	 * a data stream source with a parallelism one.</p>
 	 *
 	 * @param data
 	 * 		The iterator of elements to create the data stream from
@@ -574,27 +598,20 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the elements in the iterator
 	 */
-	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data,
TypeInformation<OUT>
-			typeInfo) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data,
TypeInformation<OUT> typeInfo) {
 		Preconditions.checkNotNull(data, "The iterator must not be null");
 
 		SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
-		return addSource(function, "Collection Source").returns(typeInfo);
-	}
-
-	// private helper for passing different names
-	private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator,
TypeInformation<OUT>
-			typeInfo, String operatorName) {
-		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+		return addSource(function, "Collection Source", typeInfo);
 	}
 
 	/**
 	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable,
allowing the
 	 * framework to create a parallel data stream source that returns the elements in the iterator.
-	 * <p>
-	 * Because the iterator will remain unmodified until the actual execution happens, the type
of data returned by the
+	 * 
+	 * <p>Because the iterator will remain unmodified until the actual execution happens,
the type of data returned by the
 	 * iterator must be given explicitly in the form of the type class (this is due to the fact
that the Java compiler
-	 * erases the generic type information).
+	 * erases the generic type information).</p>
 	 *
 	 * @param iterator
 	 * 		The iterator that produces the elements of the data stream
@@ -1194,19 +1211,6 @@ public abstract class StreamExecutionEnvironment {
 		currentEnvironment = eef.createExecutionEnvironment();
 	}
 
-	private static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT>
viewedAs) {
-		Preconditions.checkNotNull(viewedAs);
-
-		for (OUT elem : elements) {
-			Preconditions.checkNotNull(elem, "The collection must not contain null elements.");
-
-			if (!viewedAs.isAssignableFrom(elem.getClass())) {
-				throw new IllegalArgumentException("The elements in the collection are not all subclasses
of " +
-						viewedAs.getCanonicalName());
-			}
-		}
-	}
-
 	/**
 	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
 	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 63eb0ad..394fa77 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -27,75 +27,108 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
-
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * A stream source function that returns a sequence of elements.
+ * 
+ * <p>Upon construction, this source function serializes the elements using Flink's
type information.
+ * That way, any object transport using Java serialization will not be affected by the serializability
+ * if the elements.</p>
+ * 
+ * @param <T> The type of elements returned by this function.
+ */
 public class FromElementsFunction<T> implements SourceFunction<T> {
 	
 	private static final long serialVersionUID = 1L;
 
+	/** The (de)serializer to be used for the data elements */
 	private final TypeSerializer<T> serializer;
-	private final byte[] elements;
+	
+	/** The actual data elements, in serialized form */
+	private final byte[] elementsSerialized;
+	
+	/** The number of serialized elements */
+	private final int numElements;
 
+	/** Flag to make the source cancelable */
 	private volatile boolean isRunning = true;
 
-	public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
-		this(serializer, new Iterable<T>() {
-			@Override
-			public Iterator<T> iterator() {
-				return new Iterator<T>() {
-					int index = 0;
-
-					@Override
-					public boolean hasNext() {
-						return index < elements.length;
-					}
-
-					@Override
-					public T next() {
-						return elements[index++];
-					}
-
-					@Override
-					public void remove() {
-						throw new UnsupportedOperationException();
-					}
-				};
-			}
-		});
+	
+	public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException
{
+		this(serializer, Arrays.asList(elements));
 	}
-
-	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements)
{
+	
+	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements)
throws IOException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
 
+		int count = 0;
 		try {
 			for (T element : elements) {
 				serializer.serialize(element, wrapper);
+				count++;
 			}
-		} catch (IOException e) {
-			// ByteArrayOutputStream doesn't throw IOExceptions when written to
 		}
-		// closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn
doesn't do anything.
+		catch (Exception e) {
+			throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+		}
 
 		this.serializer = serializer;
-		this.elements = baos.toByteArray();
+		this.elementsSerialized = baos.toByteArray();
+		this.numElements = count;
 	}
 
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
-		T value = serializer.createInstance();
-		ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+		ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
 		DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
 
-		while (isRunning && bais.available() > 0) {
-			value = serializer.deserialize(value, input);
-			ctx.collect(value);
+		int numEmitted = 0;
+		while (isRunning && numEmitted++ < numElements) {
+			T next;
+			try {
+				next = serializer.deserialize(input);
+			}
+			catch (Exception e) {
+				throw new IOException("Failed to deserialize an element from the source. " +
+						"If you are using user-defined serialization (Value and Writable types), check the
" +
+						"serialization functions.\nSerializer is " + serializer);
+			}
+			
+			ctx.collect(next);
 		}
-		// closing the DataInputStream would just close the ByteArrayInputStream, which doesn't
do anything
 	}
 
 	@Override
 	public void cancel() {
 		isRunning = false;
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Verifies that all elements in the collection are non-null, and are of the given class,
or
+	 * a subclass thereof.
+	 * 
+	 * @param elements The collection to check.
+	 * @param viewedAs The class to which the elements must be assignable to.
+	 * 
+	 * @param <OUT> The generic type of the collection to be checked.
+	 */
+	public static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT>
viewedAs) {
+		for (OUT elem : elements) {
+			if (elem == null) {
+				throw new IllegalArgumentException("The collection contains a null element");
+			}
+
+			if (!viewedAs.isAssignableFrom(elem.getClass())) {
+				throw new IllegalArgumentException("The elements in the collection are not all subclasses
of " +
+						viewedAs.getCanonicalName());
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 921a33b..58ee1da 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -45,11 +45,10 @@ import org.apache.flink.api.common.functions.Function;
  * {@code
  *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long>
{
  *      private long count = 0L;
- *      private volatile boolean isRunning;
+ *      private volatile boolean isRunning = true;
  *
  *      @Override
  *      public void run(SourceContext<T> ctx) {
- *          isRunning = true;
  *          while (isRunning && count < 1000) {
  *              synchronized (ctx.getCheckpointLock()) {
  *                  ctx.collect(count);
@@ -104,16 +103,20 @@ public interface SourceFunction<T> extends Function, Serializable
{
 	 *
 	 * @param <T> The type of the elements produced by the source.
 	 */
-	interface SourceContext<T> {
+	public static interface SourceContext<T> {
 
 		/**
 		 * Emits one element from the source.
+		 * 
+		 * @param element The element to emit.
 		 */
 		void collect(T element);
 
 		/**
 		 * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
 		 * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
+		 * 
+		 * @return The object to use the lock. 
 		 */
 		Object getCheckpointLock();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 7373276..e2fe599 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -24,14 +24,14 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -49,27 +49,33 @@ public class StreamExecutionEnvironmentTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFromCollectionParallelism() {
-		TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-		boolean seenExpectedException = false;
-
 		try {
-			DataStream<Object> dataStream1 = env.fromCollection(new DummySplittableIterator(),
typeInfo)
-					.setParallelism(4);
-		} catch (IllegalArgumentException e) {
-			seenExpectedException = true;
+			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+			StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(),
typeInfo);
+			
+			try {
+				dataStream1.setParallelism(4);
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+	
+			env.fromParallelCollection(new DummySplittableIterator<Integer>(), typeInfo).setParallelism(4);
+	
+			String plan = env.getExecutionPlan();
+			
+			assertTrue("Parallelism for dataStream1 is not right.",
+					plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
+			assertTrue("Parallelism for dataStream2 is not right.",
+					plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
-
-		DataStream<Object> dataStream2 = env.fromParallelCollection(new DummySplittableIterator(),
typeInfo)
-				.setParallelism(4);
-
-		String plan = env.getExecutionPlan();
-
-		assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
-		assertTrue("Parallelism for dataStream1 is not right.",
-				plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
-		assertTrue("Parallelism for dataStream2 is not right.",
-				plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
 	}
 
 	@Test
@@ -119,12 +125,13 @@ public class StreamExecutionEnvironmentTest {
 		return (SourceFunction<T>) operator.getUserFunction();
 	}
 
-	public static class DummySplittableIterator extends SplittableIterator {
+	public static class DummySplittableIterator<T> extends SplittableIterator<T>
{
 		private static final long serialVersionUID = 1312752876092210499L;
 
+		@SuppressWarnings("unchecked")
 		@Override
-		public Iterator[] split(int numPartitions) {
-			return new Iterator[0];
+		public Iterator<T>[] split(int numPartitions) {
+			return (Iterator<T>[]) new Iterator<?>[0];
 		}
 
 		@Override
@@ -138,8 +145,8 @@ public class StreamExecutionEnvironmentTest {
 		}
 
 		@Override
-		public Object next() {
-			return null;
+		public T next() {
+			throw new NoSuchElementException();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
new file mode 100644
index 0000000..db91b33
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}.
+ */
+public class FromElementsFunctionTest {
+	
+	@Test
+	public void testStrings() {
+		try {
+			String[] data = { "Oh", "boy", "what", "a", "show", "!"};
+
+			FromElementsFunction<String> source = new FromElementsFunction<String>(
+					BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), data);
+			
+			List<String> result = new ArrayList<String>();
+			source.run(new ListSourceContext<String>(result));
+			
+			assertEquals(Arrays.asList(data), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNonJavaSerializableType() {
+		try {
+			MyPojo[] data = { new MyPojo(1, 2), new MyPojo(3, 4), new MyPojo(5, 6) };
+
+			FromElementsFunction<MyPojo> source = new FromElementsFunction<MyPojo>(
+					TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), data);
+
+			List<MyPojo> result = new ArrayList<MyPojo>();
+			source.run(new ListSourceContext<MyPojo>(result));
+
+			assertEquals(Arrays.asList(data), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerializationError() {
+		try {
+			TypeInformation<SerializationErrorType> info = 
+					new ValueTypeInfo<SerializationErrorType>(SerializationErrorType.class);
+			
+			try {
+				new FromElementsFunction<SerializationErrorType>(
+					info.createSerializer(new ExecutionConfig()), new SerializationErrorType());
+				
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				assertTrue(ExceptionUtils.stringifyException(e).contains("test exception"));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDeSerializationError() {
+		try {
+			TypeInformation<DeserializeTooMuchType> info =
+					new ValueTypeInfo<DeserializeTooMuchType>(DeserializeTooMuchType.class);
+
+			FromElementsFunction<DeserializeTooMuchType> source = new FromElementsFunction<DeserializeTooMuchType>(
+					info.createSerializer(new ExecutionConfig()), new DeserializeTooMuchType());
+			
+			try {
+				source.run(new ListSourceContext<DeserializeTooMuchType>(new ArrayList<DeserializeTooMuchType>()));
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				assertTrue(ExceptionUtils.stringifyException(e).contains("user-defined serialization"));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	//  Test Types
+	// ------------------------------------------------------------------------
+	
+	public static class MyPojo {
+		
+		public long val1;
+		public int val2;
+
+		public MyPojo() {}
+		
+		public MyPojo(long val1, int val2) {
+			this.val1 = val1;
+			this.val2 = val2;
+		}
+
+		@Override
+		public int hashCode() {
+			return this.val2;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof MyPojo) {
+				MyPojo that = (MyPojo) obj;
+				return this.val1 == that.val1 && this.val2 == that.val2; 
+			}
+			else {
+				return false;
+			}
+		}
+	}
+	
+	public static class SerializationErrorType implements Value {
+
+		private static final long serialVersionUID = -6037206294939421807L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			throw new IOException("test exception");
+		}
+	}
+
+	public static class DeserializeTooMuchType implements Value {
+
+		private static final long serialVersionUID = -6037206294939421807L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(42);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			in.readLong();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
new file mode 100644
index 0000000..e718633
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.List;
+
+/**
+ * Mock context that collects elements in a List.
+ * 
+ * @param <T> Type of the collected elements.
+ */
+public class ListSourceContext<T> implements SourceFunction.SourceContext<T>
{
+	
+	private final Object lock = new Object();
+	
+	private final List<T> target;
+
+	
+	public ListSourceContext(List<T> target) {
+		this.target = target;
+	}
+
+	@Override
+	public void collect(T element) {
+		target.add(element);
+	}
+
+	@Override
+	public Object getCheckpointLock() {
+		return lock;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7215a4d..70e652f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandleProvider
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.SplittableIterator
 
@@ -47,7 +47,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce)
to run
    * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream.setParallelism]].
+   * [[DataStream#setParallelism(int)]].
    * @deprecated Please use [[setParallelism]]
    */
   @deprecated
@@ -57,7 +57,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
    * @deprecated Please use [[getParallelism]]
    */
   @deprecated
@@ -67,7 +67,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce)
to run
    * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream.setParallelism]].
+   * [[DataStream#setParallelism(int)]].
    */
   def setParallelism(parallelism: Int): Unit = {
     javaEnv.setParallelism(parallelism)
@@ -75,7 +75,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
    */
   def getParallelism = javaEnv.getParallelism
 
@@ -86,15 +86,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * can result in three logical modes:
    *
    * <ul>
-   * <li>
-   * A positive integer triggers flushing periodically by that integer</li>
-   * <li>
-   * 0 triggers flushing after every record thus minimizing latency</li>
-   * <li>
-   * -1 triggers flushing only when the output buffer is full thus maximizing
-   * throughput</li>
+   *   <li>A positive integer triggers flushing periodically by that integer</li>
+   *   <li>0 triggers flushing after every record thus minimizing latency</li>
+   *   <li>-1 triggers flushing only when the output buffer is full thus maximizing
throughput</li>
    * </ul>
-   *
    */
   def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
     javaEnv.setBufferTimeout(timeoutMillis)
@@ -127,7 +122,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
-   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * [[setNumberOfExecutionRetries(int)]] method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   @deprecated
@@ -142,7 +137,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * 
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
-   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * [[setNumberOfExecutionRetries(int)]] method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
@@ -156,7 +151,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
-   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * [[setNumberOfExecutionRetries(int)]] method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   def enableCheckpointing() : StreamExecutionEnvironment = {
@@ -284,10 +279,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
-    val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig),
-      scala.collection.JavaConversions.asJavaCollection(data))
-
-    javaEnv.addSource(sourceFunction).returns(typeInfo)
+    javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo)
   }
 
   /**
@@ -460,7 +452,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getExecutionPlan = javaEnv.getExecutionPlan
 
   /**
-   * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming
job.
+   * Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming
job.
    *
    * @return The StreamGraph representing the transformations
    */
@@ -468,7 +460,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
    */
   private[flink] def scalaClean[F <: AnyRef](f: F): F = {
     if (getConfig.isClosureCleanerEnabled) {
@@ -484,7 +476,7 @@ object StreamExecutionEnvironment {
 
   /**
    * Sets the default parallelism that will be used for the local execution
-   * environment created by {@link #createLocalEnvironment()}.
+   * environment created by [[createLocalEnvironment()]].
    *
    * @param parallelism
    * The parallelism to use as the default local parallelism.


Mime
View raw message