flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/6] flink git commit: [hotfix] [misc] Minor code cleanups
Date Thu, 20 Apr 2017 12:17:13 GMT
[hotfix] [misc] Minor code cleanups


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d8627cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d8627cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d8627cc

Branch: refs/heads/master
Commit: 4d8627ccaa7fec273cb97244c771473c78485bd4
Parents: 1ed0727
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Mar 27 17:41:04 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Apr 20 10:59:50 2017 +0200

----------------------------------------------------------------------
 .../core/io/VersionedIOReadableWritable.java    |  2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   | 30 +++++++++++++-------
 .../ContinuousFileMonitoringFunction.java       |  2 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |  6 ++--
 4 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d8627cc/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
index 31d570c..bad9cef 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
@@ -53,7 +53,7 @@ public abstract class VersionedIOReadableWritable implements IOReadableWritable,
 	 */
 	protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
 		if (!isCompatibleVersion(foundVersion)) {
-			long expectedVersion = getVersion();
+			int expectedVersion = getVersion();
 			throw new VersionMismatchException(
 					"Incompatible version: found " + foundVersion + ", required " + expectedVersion);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d8627cc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index b7eb037..e1182ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -62,6 +62,19 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class PendingCheckpoint {
 
+	/**
+	 * Result of the {@link PendingCheckpoint#acknowledgedTasks} method.
+	 */
+	public enum TaskAcknowledgeResult {
+		SUCCESS, // successful acknowledge of the task
+		DUPLICATE, // acknowledge message is a duplicate
+		UNKNOWN, // unknown task acknowledged
+		DISCARDED // pending checkpoint has been discarded
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** The PendingCheckpoint logs to the same logger as the CheckpointCoordinator */
 	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
 
 	private final Object lock = new Object();
@@ -201,7 +214,8 @@ public class PendingCheckpoint {
 	}
 
 	/**
-	 * Sets the handle for the canceller to this pending checkoint.
+	 * Sets the handle for the canceller to this pending checkpoint. This method fails
+	 * with an exception if a handle has already been set.
 	 * 
 	 * @return true, if the handle was set, false, if the checkpoint is already disposed;
 	 */
@@ -422,15 +436,7 @@ public class PendingCheckpoint {
 		}
 	}
 
-	/**
-	 * Result of the {@link PendingCheckpoint#acknowledgedTasks} method.
-	 */
-	public enum TaskAcknowledgeResult {
-		SUCCESS, // successful acknowledge of the task
-		DUPLICATE, // acknowledge message is a duplicate
-		UNKNOWN, // unknown task acknowledged
-		DISCARDED // pending checkpoint has been discarded
-	}
+	
 
 	// ------------------------------------------------------------------------
 	//  Cancellation
@@ -546,7 +552,9 @@ public class PendingCheckpoint {
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d8627cc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index a7539d1..8b82d84 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -83,7 +83,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 	private final int readerParallelism;
 
 	/** The {@link FileInputFormat} to be read. */
-	private FileInputFormat<OUT> format;
+	private final FileInputFormat<OUT> format;
 
 	/** The interval between consecutive path scans. */
 	private final long interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/4d8627cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 1a6fa8f..3676953 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -168,8 +168,10 @@ public class SourceStreamTaskTest {
 				// simulate some work
 				try {
 					Thread.sleep(readDelay);
-				} catch (InterruptedException e) {
-					e.printStackTrace();
+				}
+				catch (InterruptedException e) {
+					// ignore and reset interruption state
+					Thread.currentThread().interrupt();
 				}
 
 				synchronized (lockObject) {


Mime
View raw message