flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3651] Fix faulty RollingSink Restore
Date Tue, 29 Mar 2016 17:14:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2e4498bd3 -> 580a1778c


[FLINK-3651] Fix faulty RollingSink Restore

On restore the sink for subtask index i has to cleanup leftover files
for subtask i. The pattern used for checking this was not properly
terminated so the sink for subtask 1 would, for example, delete some
files for sink i=11. This would lead to data loss.

This closes #1830


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/580a1778
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/580a1778
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/580a1778

Branch: refs/heads/master
Commit: 580a1778c1e6a25639218daaf618570ea30e3621
Parents: 2e4498b
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Mar 7 17:15:20 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue Mar 29 19:13:50 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    | 38 ++++++++++----------
 .../fs/RollingSinkFaultTolerance2ITCase.java    |  2 +-
 .../fs/RollingSinkFaultToleranceITCase.java     |  2 +-
 .../StreamFaultToleranceTestBase.java           |  4 +--
 4 files changed, 24 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/580a1778/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index afae68f..f186f53 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -336,15 +336,15 @@ public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConf
 					LocatedFileStatus file = bucketFiles.next();
 					if (file.getPath().toString().endsWith(pendingSuffix)) {
 						// only delete files that contain our subtask index
-						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
-							LOG.debug("Deleting leftover pending file {}", file.getPath().toString());
+						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+							LOG.debug("(OPEN) Deleting leftover pending file {}", file.getPath().toString());
 							fs.delete(file.getPath(), true);
 						}
 					}
 					if (file.getPath().toString().endsWith(inProgressSuffix)) {
 						// only delete files that contain our subtask index
-						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
-							LOG.debug("Deleting leftover in-progress file {}", file.getPath().toString());
+						if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+							LOG.debug("(OPEN) Deleting leftover in-progress file {}", file.getPath().toString());
 							fs.delete(file.getPath(), true);
 						}
 					}
@@ -637,6 +637,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 			writer.flush();
 		}
 		if (outStream != null) {
+			outStream.flush();
 			hflushOrSync(outStream);
 			bucketState.currentFile = currentPartPath.toString();
 			bucketState.currentFileValidLength = outStream.getPos();
@@ -681,10 +682,11 @@ public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConf
 					LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
 					// it was still in progress, rename to final path
 					fs.rename(partInProgressPath, partPath);
+				} else if (fs.exists(partPath)) {
+					LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile,
partPath);
 				} else {
-					LOG.error("In-Progress file {} was neither moved to pending nor is still in progress.",
bucketState.currentFile);
-					throw new RuntimeException("In-Progress file " + bucketState.currentFile+ " " +
-							"was neither moved to pending nor is still in progress.");
+					LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress.
Possibly, " +
+							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
 				}
 
 				refTruncate = reflectTruncate(fs);
@@ -739,9 +741,11 @@ public class RollingSink<T> extends RichSinkFunction<T> implements
InputTypeConf
 				} else {
 					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
 					Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix);
-					FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-					lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-					lengthFileOut.close();
+					if (!fs.exists(validLengthFilePath)) {
+						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+						lengthFileOut.close();
+					}
 				}
 
 				// invalidate in the state object
@@ -772,14 +776,11 @@ public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConf
 
 				try {
 					if (fs.exists(pendingPath)) {
-						LOG.debug(
-								"Moving pending file {} to final location after complete checkpoint {}.",
-								pendingPath,
-								pastCheckpointId);
+						LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint
{}.", pendingPath, pastCheckpointId);
 						fs.rename(pendingPath, finalPath);
 					}
 				} catch (IOException e) {
-					LOG.error("Error while renaming pending file {} to final path {}: {}", pendingPath,
finalPath, e);
+					LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath,
finalPath, e);
 					throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to
final path " + finalPath, e);
 				}
 			}
@@ -800,14 +801,15 @@ public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConf
 				LocatedFileStatus file = bucketFiles.next();
 				if (file.getPath().toString().endsWith(pendingSuffix)) {
 					// only delete files that contain our subtask index
-					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
+					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+						LOG.debug("(RESTORE) Deleting pending file {}", file.getPath().toString());
 						fs.delete(file.getPath(), true);
 					}
 				}
 				if (file.getPath().toString().endsWith(inProgressSuffix)) {
 					// only delete files that contain our subtask index
-					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex)) {
-						LOG.debug("Deleting in-progress file {}", file.getPath().toString());
+					if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
+						LOG.debug("(RESTORE) Deleting in-progress file {}", file.getPath().toString());
 						fs.delete(file.getPath(), true);
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/580a1778/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
index 7d127ff..e516f50 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -104,7 +104,7 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
 	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		int PARALLELISM = 6;
+		int PARALLELISM = 12;
 
 		env.enableCheckpointing(Long.MAX_VALUE);
 		env.setParallelism(PARALLELISM);

http://git-wip-us.apache.org/repos/asf/flink/blob/580a1778/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 65904d2..334e761 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -99,7 +99,7 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
-		int PARALLELISM = 6;
+		int PARALLELISM = 12;
 
 		env.enableCheckpointing(200);
 		env.setParallelism(PARALLELISM);

http://git-wip-us.apache.org/repos/asf/flink/blob/580a1778/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 0fffdb5..530bae9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -37,8 +37,8 @@ import static org.junit.Assert.fail;
  */
 public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
-	protected static final int NUM_TASK_MANAGERS = 2;
-	protected static final int NUM_TASK_SLOTS = 3;
+	protected static final int NUM_TASK_MANAGERS = 3;
+	protected static final int NUM_TASK_SLOTS = 4;
 	protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
 	private static ForkableFlinkMiniCluster cluster;


Mime
View raw message