Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 057E4200BDA for ; Tue, 13 Dec 2016 13:44:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 043A1160B15; Tue, 13 Dec 2016 12:44:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F072F160B31 for ; Tue, 13 Dec 2016 13:44:19 +0100 (CET) Received: (qmail 63222 invoked by uid 500); 13 Dec 2016 12:44:19 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 63129 invoked by uid 99); 13 Dec 2016 12:44:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Dec 2016 12:44:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4992F179A; Tue, 13 Dec 2016 12:44:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Tue, 13 Dec 2016 12:44:21 -0000 Message-Id: In-Reply-To: <5aa9ae1a85af4e4985619380949e1b7b@git.apache.org> References: <5aa9ae1a85af4e4985619380949e1b7b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] flink git commit: [FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state abstractions. archived-at: Tue, 13 Dec 2016 12:44:21 -0000 [FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state abstractions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/685c4f83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/685c4f83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/685c4f83 Branch: refs/heads/master Commit: 685c4f836bdb79181fd1f62642736606eb81d847 Parents: 3698379 Author: kl0u Authored: Thu Nov 17 14:54:08 2016 +0100 Committer: Aljoscha Krettek Committed: Tue Dec 13 13:38:18 2016 +0100 ---------------------------------------------------------------------- .../ContinuousFileProcessingITCase.java | 2 +- .../hdfstests/ContinuousFileProcessingTest.java | 95 ++++++++++++++++++-- .../environment/StreamExecutionEnvironment.java | 4 +- .../ContinuousFileMonitoringFunction.java | 79 +++++++++++++--- 4 files changed, 157 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java index 3211a20..df68a76 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java @@ -124,7 +124,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase { env.setParallelism(PARALLELISM); ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(), INTERVAL); http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 6454c11..0cb1bad 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -35,9 +35,11 @@ import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOpera import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FSDataOutputStream; @@ -117,10 +119,10 @@ public class ContinuousFileProcessingTest { public void testInvalidPathSpecification() throws Exception { String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/"; - TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TextInputFormat format = new TextInputFormat(new Path(invalidPath)); ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, invalidPath, + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); try { monitoringFunction.run(new DummySourceContext() { @@ -135,7 +137,7 @@ public class ContinuousFileProcessingTest { Assert.fail("Test passed with an invalid path."); } catch (FileNotFoundException e) { - Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage()); + Assert.assertEquals("The provided file path " + format.getFilePath().toString() + " does not exist.", e.getMessage()); } } @@ -491,6 +493,8 @@ public class ContinuousFileProcessingTest { private static class BlockingFileInputFormat extends FileInputFormat { + private static final long serialVersionUID = -6727603565381560267L; + private final OneShotLatch latch; private FileInputSplit split; @@ -556,6 +560,9 @@ public class ContinuousFileProcessingTest { TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); format.setFilesFilter(new FilePathFilter() { + + private static final long serialVersionUID = 2611449927338589804L; + @Override public boolean filterPath(Path filePath) { return filePath.getName().startsWith("**"); @@ -563,7 +570,7 @@ public class ContinuousFileProcessingTest { }); ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); final FileVerifyingSourceContext context = @@ -601,7 +608,7 @@ public class ContinuousFileProcessingTest { FileInputSplit[] splits = format.createInputSplits(1); ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes); @@ -633,7 +640,7 @@ public class ContinuousFileProcessingTest { format.setFilesFilter(FilePathFilter.createDefaultFilter()); final ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction); @@ -683,6 +690,80 @@ public class ContinuousFileProcessingTest { } @Test + public void testFunctionRestore() throws Exception { + + org.apache.hadoop.fs.Path path = null; + long fileModTime = Long.MIN_VALUE; + for (int i = 0; i < 1; i++) { + Tuple2 file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line."); + path = file.f0; + fileModTime = hdfs.getFileStatus(file.f0).getModificationTime(); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + + final ContinuousFileMonitoringFunction monitoringFunction = + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); + + StreamSource> src = + new StreamSource<>(monitoringFunction); + + final AbstractStreamOperatorTestHarness testHarness = + new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + testHarness.open(); + + final Throwable[] error = new Throwable[1]; + + final OneShotLatch latch = new OneShotLatch(); + + // run the source asynchronously + Thread runner = new Thread() { + @Override + public void run() { + try { + monitoringFunction.run(new DummySourceContext() { + @Override + public void collect(TimestampedFileInputSplit element) { + latch.trigger(); + } + }); + } + catch (Throwable t) { + t.printStackTrace(); + error[0] = t; + } + } + }; + runner.start(); + + if (!latch.isTriggered()) { + latch.await(); + } + + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + monitoringFunction.cancel(); + runner.join(); + + testHarness.close(); + + final ContinuousFileMonitoringFunction monitoringFunctionCopy = + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); + + StreamSource> srcCopy = + new StreamSource<>(monitoringFunctionCopy); + + AbstractStreamOperatorTestHarness testHarnessCopy = + new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0); + testHarnessCopy.initializeState(snapshot); + testHarnessCopy.open(); + + Assert.assertNull(error[0]); + Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime()); + + hdfs.delete(path, false); + } + + @Test public void testProcessContinuously() throws Exception { final OneShotLatch latch = new OneShotLatch(); @@ -698,7 +779,7 @@ public class ContinuousFileProcessingTest { format.setFilesFilter(FilePathFilter.createDefaultFilter()); final ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 08e17a1..99784e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1351,9 +1351,7 @@ public abstract class StreamExecutionEnvironment { ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms."); ContinuousFileMonitoringFunction monitoringFunction = - new ContinuousFileMonitoringFunction<>( - inputFormat, inputFormat.getFilePath().toString(), - monitoringMode, getParallelism(), interval); + new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval); ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(inputFormat); http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 54ab0ab..8723853 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -17,14 +17,20 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +64,7 @@ import java.util.TreeMap; */ @Internal public class ContinuousFileMonitoringFunction - extends RichSourceFunction implements Checkpointed { + extends RichSourceFunction implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -92,10 +98,13 @@ public class ContinuousFileMonitoringFunction private volatile boolean isRunning = true; + private transient ListState checkpointedState; + public ContinuousFileMonitoringFunction( - FileInputFormat format, String path, + FileInputFormat format, FileProcessingMode watchType, - int readerParallelism, long interval) { + int readerParallelism, + long interval) { Preconditions.checkArgument( watchType == FileProcessingMode.PROCESS_ONCE || interval >= MIN_MONITORING_INTERVAL, @@ -104,7 +113,7 @@ public class ContinuousFileMonitoringFunction ); this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format."); - this.path = Preconditions.checkNotNull(path, "Unspecified Path."); + this.path = Preconditions.checkNotNull(format.getFilePath().toString(), "Unspecified Path."); this.interval = interval; this.watchType = watchType; @@ -112,13 +121,56 @@ public class ContinuousFileMonitoringFunction this.globalModificationTime = Long.MIN_VALUE; } + @VisibleForTesting + public long getGlobalModificationTime() { + return this.globalModificationTime; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + + Preconditions.checkState(this.checkpointedState == null, + "The " + getClass().getSimpleName() + " has already been initialized."); + + this.checkpointedState = context.getOperatorStateStore().getOperatorState( + new ListStateDescriptor<>( + "file-monitoring-state", + LongSerializer.INSTANCE + ) + ); + + if (context.isRestored()) { + LOG.info("Restoring state for the {}.", getClass().getSimpleName()); + + List retrievedStates = new ArrayList<>(); + for (Long entry : this.checkpointedState.get()) { + retrievedStates.add(entry); + } + + // given that the parallelism of the function is 1, we can only have 1 state + Preconditions.checkArgument(retrievedStates.size() == 1, + getClass().getSimpleName() + " retrieved invalid state."); + + this.globalModificationTime = retrievedStates.get(0); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} retrieved a global mod time of {}.", + getClass().getSimpleName(), globalModificationTime); + } + + } else { + LOG.info("No state to restore for the {}.", getClass().getSimpleName()); + } + } + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); format.configure(parameters); if (LOG.isDebugEnabled()) { - LOG.debug("Opened File Monitoring Source for path: " + path + "."); + LOG.debug("Opened {} (taskIdx= {}) for path: {}", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), path); } } @@ -294,12 +346,15 @@ public class ContinuousFileMonitoringFunction // --------------------- Checkpointing -------------------------- @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return this.globalModificationTime; - } + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkState(this.checkpointedState != null, + "The " + getClass().getSimpleName() + " state has not been properly initialized."); - @Override - public void restoreState(Long state) throws Exception { - this.globalModificationTime = state; + this.checkpointedState.clear(); + this.checkpointedState.add(this.globalModificationTime); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime); + } } }