flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [9/9] flink git commit: [FLINK-2330] [streaming] Make FromElementsFunction checkpointable
Date Wed, 08 Jul 2015 18:31:16 GMT
[FLINK-2330] [streaming] Make FromElementsFunction checkpointable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e451a4ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e451a4ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e451a4ae

Branch: refs/heads/master
Commit: e451a4ae00d84c2ed1aae1bec6cc2c43caa2bf90
Parents: 28713a2
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jul 8 19:02:59 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jul 8 20:28:41 2015 +0200

----------------------------------------------------------------------
 .../functions/source/FromElementsFunction.java  | 74 ++++++++++++++++++--
 .../api/functions/FromElementsFunctionTest.java | 74 ++++++++++++++++++++
 .../api/functions/ListSourceContext.java        | 16 +++++
 3 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 394fa77..28544ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -39,7 +40,7 @@ import java.util.Collection;
  * 
  * @param <T> The type of elements returned by this function.
  */
-public class FromElementsFunction<T> implements SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer>
{
 	
 	private static final long serialVersionUID = 1L;
 
@@ -52,6 +53,12 @@ public class FromElementsFunction<T> implements SourceFunction<T>
{
 	/** The number of serialized elements */
 	private final int numElements;
 
+	/** The number of elements emitted already */
+	private volatile int numElementsEmitted;
+
+	/** The number of elements to skip initially */
+	private volatile int numElementsToSkip;
+	
 	/** Flag to make the source cancelable */
 	private volatile boolean isRunning = true;
 
@@ -83,10 +90,29 @@ public class FromElementsFunction<T> implements SourceFunction<T>
{
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
 		ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
-		DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
-
-		int numEmitted = 0;
-		while (isRunning && numEmitted++ < numElements) {
+		final DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
+		
+		// if we are restored from a checkpoint and need to skip elements, skip them now.
+		int toSkip = numElementsToSkip;
+		if (toSkip > 0) {
+			try {
+				while (toSkip > 0) {
+					serializer.deserialize(input);
+					toSkip--;
+				}
+			}
+			catch (Exception e) {
+				throw new IOException("Failed to deserialize an element from the source. " +
+						"If you are using user-defined serialization (Value and Writable types), check the
" +
+						"serialization functions.\nSerializer is " + serializer);
+			}
+			
+			this.numElementsEmitted = this.numElementsToSkip;
+		}
+		
+		final Object lock = ctx.getCheckpointLock();
+		
+		while (isRunning && numElementsEmitted < numElements) {
 			T next;
 			try {
 				next = serializer.deserialize(input);
@@ -97,7 +123,10 @@ public class FromElementsFunction<T> implements SourceFunction<T>
{
 						"serialization functions.\nSerializer is " + serializer);
 			}
 			
-			ctx.collect(next);
+			synchronized (lock) {
+				ctx.collect(next);
+				numElementsEmitted++;
+			}
 		}
 	}
 
@@ -105,6 +134,39 @@ public class FromElementsFunction<T> implements SourceFunction<T>
{
 	public void cancel() {
 		isRunning = false;
 	}
+
+
+	/**
+	 * Gets the number of elements produced in total by this function.
+	 * 
+	 * @return The number of elements produced in total.
+	 */
+	public int getNumElements() {
+		return numElements;
+	}
+
+	/**
+	 * Gets the number of elements emitted so far.
+	 * 
+	 * @return The number of elements emitted so far.
+	 */
+	public int getNumElementsEmitted() {
+		return numElementsEmitted;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+		return this.numElementsEmitted;
+	}
+
+	@Override
+	public void restoreState(Integer state) {
+		this.numElementsToSkip = state;
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index db91b33..9c3653b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -21,13 +21,17 @@ package org.apache.flink.streaming.api.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -125,6 +129,76 @@ public class FromElementsFunctionTest {
 		}
 	}
 	
+	@Test
+	public void testCheckpointAndRestore() {
+		try {
+			final int NUM_ELEMENTS = 10000;
+			
+			List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS);
+			List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS);
+			
+			for (int i = 0; i < NUM_ELEMENTS; i++) {
+				data.add(i);
+			}
+			
+			final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE,
data);
+			final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source);
+			
+			final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result,
2L);
+			
+			final Throwable[] error = new Throwable[1];
+			
+			// run the source asynchronously
+			Thread runner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						source.run(ctx);
+					}
+					catch (Throwable t) {
+						error[0] = t;
+					}
+				}
+			};
+			runner.start();
+			
+			// wait for a bit 
+			Thread.sleep(1000);
+			
+			// make a checkpoint
+			int count;
+			List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS);
+			
+			synchronized (ctx.getCheckpointLock()) {
+				count = source.snapshotState(566, System.currentTimeMillis());
+				checkpointData.addAll(result);
+			}
+			
+			// cancel the source
+			source.cancel();
+			runner.join();
+			
+			// check for errors
+			if (error[0] != null) {
+				System.err.println("Error in asynchronous source runner");
+				error[0].printStackTrace();
+				fail("Error in asynchronous source runner");
+			}
+			
+			// recovery run
+			SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData);
+			sourceCopy.restoreState(count);
+			
+			sourceCopy.run(newCtx);
+			
+			assertEquals(data, checkpointData);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	
 	// ------------------------------------------------------------------------
 	//  Test Types

http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index e718633..f241955 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -33,14 +33,30 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T>
{
 	
 	private final List<T> target;
 
+	private final long delay;
+	
 	
 	public ListSourceContext(List<T> target) {
+		this(target, 0L);
+	}
+
+	public ListSourceContext(List<T> target, long delay) {
 		this.target = target;
+		this.delay = delay;
 	}
 
 	@Override
 	public void collect(T element) {
 		target.add(element);
+		
+		if (delay > 0) {
+			try {
+				Thread.sleep(delay);
+			}
+			catch (InterruptedException e) {
+				// ignore
+			}
+		}
 	}
 
 	@Override


Mime
View raw message