flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-5318] Make the RollingSink backwards compatible.
Date Fri, 13 Jan 2017 10:55:22 GMT
[FLINK-5318] Make the RollingSink backwards compatible.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbca3293
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbca3293
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbca3293

Branch: refs/heads/master
Commit: bbca3293ab84e8af124e66fcdfc394bbf8d5954b
Parents: 7a2d3be
Author: kl0u <kkloudas@gmail.com>
Authored: Fri Jan 6 15:38:28 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Jan 13 11:46:45 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  32 +++-
 .../connectors/fs/bucketing/BucketingSink.java  |   8 +-
 .../fs/bucketing/RollingSinkMigrationTest.java  | 183 +++++++++++++++++++
 ...olling-sink-migration-test-flink1.1-snapshot | Bin 0 -> 1471 bytes
 4 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index fc4a35e..98eb2d4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.flink.util.Preconditions;
@@ -128,7 +129,8 @@ import java.util.UUID;
  */
 @Deprecated
 public class RollingSink<T> extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
+		implements InputTypeConfigurable, CheckpointedFunction,
+					CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -336,7 +338,12 @@ public class RollingSink<T> extends RichSinkFunction<T>
 		Preconditions.checkArgument(this.restoredBucketStates == null,
 			"The " + getClass().getSimpleName() + " has already been initialized.");
 
-		initFileSystem();
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.",
e);
+			throw new RuntimeException("Error while creating FileSystem when initializing the state
of the RollingSink.", e);
+		}
 
 		if (this.refTruncate == null) {
 			this.refTruncate = reflectTruncate(fs);
@@ -703,7 +710,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 				} else {
 					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
-					if (!fs.exists(validLengthFilePath)) {
+					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
 						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
 						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
 						lengthFileOut.close();
@@ -753,6 +760,25 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	}
 
 	// --------------------------------------------------------------------------------------------
+	//  Backwards compatibility with Flink 1.1
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(BucketState state) throws Exception {
+		LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.",
e);
+			throw new RuntimeException("Error while creating FileSystem when restoring the state of
the RollingSink.", e);
+		}
+
+		handleRestoredBucketState(state);
+	}
+
+	// --------------------------------------------------------------------------------------------
 	//  Setters for User configuration values
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cf2c373..e8bff21 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -58,7 +58,7 @@ import java.util.UUID;
 import java.util.Iterator;
 
 /**
- * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
+ * Sink that emits its input elements to {@link FileSystem} files within
  * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
  *
  * <p>
@@ -124,9 +124,9 @@ import java.util.Iterator;
  *     </li>
  *     <li>
  *         The part files are written using an instance of {@link Writer}. By default, a
- *         {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes
the result
- *         of {@code toString()} for every element, separated by newlines. You can configure
the writer using  the
- *         {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
+ *         {@link StringWriter} is used, which writes the result of {@code toString()} for
+ *         every element, separated by newlines. You can configure the writer using the
+ *         {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
  *         can be used to write Hadoop {@code SequenceFiles}.
  *     </li>
  * </ol>

http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
new file mode 100644
index 0000000..0c5e16b
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -0,0 +1,183 @@
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+@Deprecated
+public class RollingSinkMigrationTest {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	@Test
+	public void testMigration() throws Exception {
+
+		/*
+		* Code ran to get the snapshot:
+		*
+		* final File outDir = tempFolder.newFolder();
+
+		RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness1.setup();
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness1.processElement(new StreamRecord<>("test3", 0L));
+		testHarness1.processElement(new StreamRecord<>("test4", 0L));
+		testHarness1.processElement(new StreamRecord<>("test5", 0L));
+
+		checkFs(outDir, 1, 4, 0, 0);
+
+		StreamTaskState taskState = testHarness1.snapshot(0, 0);
+		testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
+		testHarness1.close();
+		* */
+
+		final File outDir = tempFolder.newFolder();
+
+		RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(sink), 10, 1, 0);
+		testHarness1.setup();
+		testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness1.close();
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid)
throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	static class ValidatingRollingSink<T> extends RollingSink<T> {
+
+		private static final long serialVersionUID = -4263974081712009141L;
+
+		ValidatingRollingSink(String basePath) {
+			super(basePath);
+		}
+
+		@Override
+		public void restoreState(BucketState state) throws Exception {
+
+			/**
+			 * this validates that we read the state that was checkpointed by the previous version.
We expect it to be:
+			 * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+			 * 					validLength=6
+			 * pendingForNextCheckpoint=[]
+			 * pendingForPrevCheckpoints={0=[	/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+			 * */
+
+			String current = state.currentFile;
+			long validLength = state.currentFileValidLength;
+
+			Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4",
current);
+			Assert.assertEquals(6, validLength);
+
+			List<String> pendingFiles = state.pendingFiles;
+			Assert.assertTrue(pendingFiles.isEmpty());
+
+			final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+			Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+			for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet())
{
+				long checkpoint = entry.getKey();
+				List<String> files = entry.getValue();
+
+				Assert.assertEquals(0L, checkpoint);
+				Assert.assertEquals(4, files.size());
+
+				for (int i = 0; i < 4; i++) {
+					Assert.assertEquals(
+						"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-"
+ i,
+						files.get(i));
+				}
+			}
+			super.restoreState(state);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..2ebd70a
Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
differ


Mime
View raw message