flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [15/15] flink git commit: [FLINK-5317] Make the continuous file processing backwards compatible w/ unit tests.
Date Tue, 20 Dec 2016 15:09:30 GMT
[FLINK-5317] Make the continuous file processing backwards compatible w/ unit tests.

This includes both the ContinuousFileMonitoringFunction and the
ContinuousFileReaderOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1220230c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1220230c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1220230c

Branch: refs/heads/master
Commit: 1220230c624234a5fd5e2ef5855aebb294184462
Parents: 2cbd9f5
Author: kl0u <kkloudas@gmail.com>
Authored: Fri Dec 16 17:52:06 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Dec 20 16:02:34 2016 +0100

----------------------------------------------------------------------
 .../ContinuousFileProcessingMigrationTest.java  | 402 +++++++++++++++++++
 ...gration-test-1482144479339-flink1.1-snapshot | Bin 0 -> 468 bytes
 .../reader-migration-test-flink1.1-snapshot     | Bin 0 -> 979 bytes
 .../ContinuousFileMonitoringFunction.java       |  35 +-
 .../source/ContinuousFileReaderOperator.java    | 125 +++++-
 5 files changed, 536 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
new file mode 100644
index 0000000..0915005
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+public class ContinuousFileProcessingMigrationTest {
+
+	private static final int NO_OF_FILES = 5;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private static File baseDir;
+
+	private static FileSystem hdfs;
+	private static String hdfsURI;
+	private static MiniDFSCluster hdfsCluster;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	//						PREPARING FOR THE TESTS
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = tempFolder.newFolder().getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			Configuration hdConf = new Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort()
+"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = WindowOperatorTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	//						END OF PREPARATIONS
+
+	//						TESTS
+
+	@Test
+	public void testReaderSnapshotRestore() throws Exception {
+
+		/*
+
+		FileInputSplit split1 =
+			new FileInputSplit(3, new Path("test/test1"), 0, 100, null);
+		FileInputSplit split2 =
+			new FileInputSplit(2, new Path("test/test2"), 101, 200, null);
+		FileInputSplit split3 =
+			new FileInputSplit(1, new Path("test/test2"), 0, 100, null);
+		FileInputSplit split4 =
+			new FileInputSplit(0, new Path("test/test3"), 0, 100, null);
+
+		final OneShotLatch latch = new OneShotLatch();
+		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
+		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+		ContinuousFileReaderOperator<FileInputSplit, ?> initReader = new ContinuousFileReaderOperator<>(format);
+		initReader.setOutputType(typeInfo, new ExecutionConfig());
+		OneInputStreamOperatorTestHarness<FileInputSplit, FileInputSplit> initTestInstance
=
+			new OneInputStreamOperatorTestHarness<>(initReader);
+		initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		initTestInstance.open();
+		// create some state in the reader
+		initTestInstance.processElement(new StreamRecord<>(split1));
+		initTestInstance.processElement(new StreamRecord<>(split2));
+		initTestInstance.processElement(new StreamRecord<>(split3));
+		initTestInstance.processElement(new StreamRecord<>(split4));
+		// take a snapshot of the operator's state. This will be used
+		// to initialize another reader and compare the results of the
+		// two operators.
+		final StreamTaskState snapshot;
+		synchronized (initTestInstance.getCheckpointLock()) {
+			snapshot = initTestInstance.snapshot(0L, 0L);
+		}
+
+		initTestInstance.snaphotToFile(snapshot, "src/test/resources/reader-migration-test-flink1.1-snapshot");
+
+		*/
+		TimestampedFileInputSplit split1 =
+			new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
+
+		TimestampedFileInputSplit split2 =
+			new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
+
+		TimestampedFileInputSplit split3 =
+			new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+
+		TimestampedFileInputSplit split4 =
+			new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+
+
+		final OneShotLatch latch = new OneShotLatch();
+
+		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
+		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
+		initReader.setOutputType(typeInfo, new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance
=
+			new OneInputStreamOperatorTestHarness<>(initReader);
+		initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		initTestInstance.setup();
+		initTestInstance.initializeStateFromLegacyCheckpoint(getResourceFilename("reader-migration-test-flink1.1-snapshot"));
+		initTestInstance.open();
+
+		latch.trigger();
+
+		// ... and wait for the operators to close gracefully
+
+		synchronized (initTestInstance.getCheckpointLock()) {
+			initTestInstance.close();
+		}
+
+		FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1);
+		FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2);
+		FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3);
+		FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4);
+
+		// compare if the results contain what they should contain and also if
+		// they are the same, as they should.
+
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1)));
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2)));
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3)));
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4)));
+	}
+
+	private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split)
{
+		Preconditions.checkNotNull(split);
+
+		return new FileInputSplit(
+			split.getSplitNumber(),
+			split.getPath(),
+			split.getStart(),
+			split.getLength(),
+			split.getHostnames()
+		);
+	}
+
+	private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit>
{
+
+		private static final long serialVersionUID = -6727603565381560267L;
+
+		private final OneShotLatch latch;
+
+		private FileInputSplit split;
+
+		private boolean reachedEnd;
+
+		BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
+			super(filePath);
+			this.latch = latch;
+			this.reachedEnd = false;
+		}
+
+		@Override
+		public void open(FileInputSplit fileSplit) throws IOException {
+			this.split = fileSplit;
+			this.reachedEnd = false;
+		}
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			if (!latch.isTriggered()) {
+				try {
+					latch.await();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+			return reachedEnd;
+		}
+
+		@Override
+		public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
+			this.reachedEnd = true;
+			return split;
+		}
+
+		@Override
+		public void close() {
+
+		}
+	}
+
+	////				Monitoring Function Tests				//////
+
+	@Test
+	public void testFunctionRestore() throws Exception {
+
+		/*
+		org.apache.hadoop.fs.Path path = null;
+		long fileModTime = Long.MIN_VALUE;
+		for (int i = 0; i < 1; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i,
"This is test line.");
+			path = file.f0;
+			fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(),
new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src
=
+			new StreamSource<>(monitoringFunction);
+
+		final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness =
+			new OneInputStreamOperatorTestHarness<>(src);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		final OneShotLatch latch = new OneShotLatch();
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					monitoringFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(FileInputSplit element) {
+							latch.trigger();
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		StreamTaskState snapshot = testHarness.snapshot(0, 0);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-"
+ fileModTime +"-flink1.1-snapshot");
+		monitoringFunction.cancel();
+		runner.join();
+
+		testHarness.close();
+		*/
+
+		Long expectedModTime = Long.parseLong("1482144479339");
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY,
1, INTERVAL);
+
+		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>>
src =
+			new StreamSource<>(monitoringFunction);
+
+		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
+		testHarness.open();
+
+		Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
+
+	}
+
+	///////////				Source Contexts Used by the tests				/////////////////
+
+	private static abstract class DummySourceContext
+		implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
+
+		private final Object lock = new Object();
+
+		@Override
+		public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/////////				Auxiliary Methods				/////////////
+
+	/**
+	 * Create a file with pre-determined String format of the form:
+	 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
+	 * */
+	private Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
+		String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName +
fileIdx);
+		Assert.assertFalse(hdfs.exists(file));
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName +
fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot
b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot
new file mode 100644
index 0000000..17eba99
Binary files /dev/null and b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..e47ebbd
Binary files /dev/null and b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8723853..1ec9a70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ import java.util.TreeMap;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction
{
+	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction,
CheckpointedRestoring<Long> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -92,7 +93,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 	private final FileProcessingMode watchType;
 
 	/** The maximum file modification time seen so far. */
-	private volatile long globalModificationTime;
+	private volatile long globalModificationTime = Long.MIN_VALUE;
 
 	private transient Object checkpointLock;
 
@@ -147,15 +148,25 @@ public class ContinuousFileMonitoringFunction<OUT>
 				retrievedStates.add(entry);
 			}
 
-			// given that the parallelism of the function is 1, we can only have 1 state
-			Preconditions.checkArgument(retrievedStates.size() == 1,
+			// given that the parallelism of the function is 1, we can only have 1 or 0 retrieved
items.
+			// the 0 is for the case that we are migrating from a previous Flink version.
+
+			Preconditions.checkArgument(retrievedStates.size() <= 1,
 				getClass().getSimpleName() + " retrieved invalid state.");
 
-			this.globalModificationTime = retrievedStates.get(0);
+			if (retrievedStates.size() == 1 && globalModificationTime != Long.MIN_VALUE) {
+				// this is the case where we have both legacy and new state.
+				// The two should be mutually exclusive for the operator, thus we throw the exception.
+
+				throw new IllegalArgumentException(
+					"The " + getClass().getSimpleName() +" has already restored from a previous Flink version.");
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} retrieved a global mod time of {}.",
-					getClass().getSimpleName(), globalModificationTime);
+			} else if (retrievedStates.size() == 1) {
+				this.globalModificationTime = retrievedStates.get(0);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} retrieved a global mod time of {}.",
+						getClass().getSimpleName(), globalModificationTime);
+				}
 			}
 
 		} else {
@@ -357,4 +368,12 @@ public class ContinuousFileMonitoringFunction<OUT>
 			LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
 		}
 	}
+
+	@Override
+	public void restoreState(Long state) throws Exception {
+		this.globalModificationTime = state;
+
+		LOG.info("{} (taskIdx={}) restored global modification time from an older Flink version:
{}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), globalModificationTime);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index bbe1ea5..6419aa6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -24,22 +24,29 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -59,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>
{
+	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>,
CheckpointedRestoringOperator {
 
 	private static final long serialVersionUID = 1L;
 
@@ -73,8 +80,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	private transient SplitReader<OUT> reader;
 	private transient SourceFunction.SourceContext<OUT> readerContext;
 
-	private ListState<TimestampedFileInputSplit> checkpointedState;
-	private List<TimestampedFileInputSplit> restoredReaderState;
+	private transient ListState<TimestampedFileInputSplit> checkpointedState;
+	private transient List<TimestampedFileInputSplit> restoredReaderState;
 
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
 		this.format = checkNotNull(format);
@@ -89,25 +96,27 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 
-		checkState(this.checkpointedState == null && this.restoredReaderState == null,
-			"The reader state has already been initialized.");
+		checkState(checkpointedState == null,	"The reader state has already been initialized.");
 
 		checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {
-			LOG.info("Restoring state for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx);
+			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
 
-			this.restoredReaderState = new ArrayList<>();
-			for (TimestampedFileInputSplit split : this.checkpointedState.get()) {
-				this.restoredReaderState.add(split);
-			}
+			// this may not be null in case we migrate from a previous Flink version.
+			if (restoredReaderState == null) {
+				restoredReaderState = new ArrayList<>();
+				for (TimestampedFileInputSplit split : checkpointedState.get()) {
+					restoredReaderState.add(split);
+				}
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("ContinuousFileReaderOperator idx {} restored {}.", subtaskIdx, this.restoredReaderState);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} (taskIdx={}) restored {}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
+				}
 			}
 		} else {
-			LOG.info("No state to restore for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx);
+			LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
 		}
 	}
 
@@ -379,20 +388,100 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	public void snapshotState(StateSnapshotContext context) throws Exception {
 		super.snapshotState(context);
 
-		checkState(this.checkpointedState != null,
+		checkState(checkpointedState != null,
 			"The operator state has not been properly initialized.");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 
-		this.checkpointedState.clear();
-		List<TimestampedFileInputSplit> readerState = this.reader.getReaderState();
+		checkpointedState.clear();
+		List<TimestampedFileInputSplit> readerState = reader.getReaderState();
 		for (TimestampedFileInputSplit split : readerState) {
 			// create a new partition for each entry.
-			this.checkpointedState.add(split);
+			checkpointedState.add(split);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
+				getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Restoring / Migrating from an older Flink version.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(FSDataInputStream in) throws Exception {
+
+		LOG.info("{} (taskIdx={}) restoring state from an older Flink version.",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+		// this is just to read the byte indicating if we have udf state or not
+		int hasUdfState = in.read();
+
+		Preconditions.checkArgument(hasUdfState == 0);
+
+		final ObjectInputStream ois = new ObjectInputStream(in);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
+
+		// read the split that was being read
+		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+
+		// read the pending splits list
+		List<FileInputSplit> pendingSplits = new LinkedList<>();
+		int noOfSplits = div.readInt();
+		for (int i = 0; i < noOfSplits; i++) {
+			FileInputSplit split = (FileInputSplit) ois.readObject();
+			pendingSplits.add(split);
+		}
+
+		// read the state of the format
+		Serializable formatState = (Serializable) ois.readObject();
+
+		div.close();
+
+		if (restoredReaderState == null) {
+			restoredReaderState = new ArrayList<>();
+		}
+
+		// we do not know the modification time of the retrieved splits, so we assign them
+		// artificial ones, with the only constraint that they respect the relative order of the
+		// retrieved splits, because modification time is going to be used to sort the splits within
+		// the "pending splits" priority queue.
+
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		long runningModTime = Math.max(now, noOfSplits + 1);
+
+		TimestampedFileInputSplit currentSplit = createTimestampedFileSplit(currSplit, --runningModTime,
formatState);
+		restoredReaderState.add(currentSplit);
+		for (FileInputSplit split : pendingSplits) {
+			TimestampedFileInputSplit timestampedSplit = createTimestampedFileSplit(split, --runningModTime);
+			restoredReaderState.add(timestampedSplit);
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ContinuousFileReaderOperator idx {} checkpointed {}.", subtaskIdx, readerState);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("{} (taskIdx={}) restored {} splits from legacy: {}.",
+					getClass().getSimpleName(),
+					getRuntimeContext().getIndexOfThisSubtask(),
+					restoredReaderState.size(),
+					restoredReaderState);
+			}
+		}
+	}
+
+	private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long
modificationTime) {
+		return createTimestampedFileSplit(split, modificationTime, null);
+	}
+
+	private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long
modificationTime, Serializable state) {
+		TimestampedFileInputSplit timestampedSplit = new TimestampedFileInputSplit(
+			modificationTime, split.getSplitNumber(), split.getPath(),
+			split.getStart(), split.getLength(), split.getHostnames());
+
+		if (state != null) {
+			timestampedSplit.setSplitState(state);
 		}
+		return timestampedSplit;
 	}
 }


Mime
View raw message