Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0C8FA200B33 for ; Tue, 14 Jun 2016 18:12:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0967D160A56; Tue, 14 Jun 2016 16:12:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 977C6160A47 for ; Tue, 14 Jun 2016 18:12:36 +0200 (CEST) Received: (qmail 42007 invoked by uid 500); 14 Jun 2016 16:12:35 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 41934 invoked by uid 99); 14 Jun 2016 16:12:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 16:12:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9645AE07FE; Tue, 14 Jun 2016 16:12:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Tue, 14 Jun 2016 16:12:36 -0000 Message-Id: <0ae7f242f7064da8af9d5959f3fc369b@git.apache.org> In-Reply-To: <72188dde60d4432ebe0e306620d32906@git.apache.org> References: <72188dde60d4432ebe0e306620d32906@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent archived-at: Tue, 14 Jun 2016 16:12:39 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index ae4758f..1cd052c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -51,14 +51,18 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction; -import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType; +import org.apache.flink.streaming.api.functions.source.FilePathFilter; import org.apache.flink.streaming.api.functions.source.FileReadFunction; -import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.InputFormatSource; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.FromIteratorFunction; import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; @@ -875,24 +879,34 @@ public abstract class StreamExecutionEnvironment { } /** - * Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be - * read with the system's default character set. + * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such + * line. The file will be read with the system's default character set. + * + *

+ * NOTES ON CHECKPOINTING: The source monitors the path, creates the + * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, + * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint + * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point. * * @param filePath * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). * @return The data stream that represents the data read from the given file as text lines */ public DataStreamSource readTextFile(String filePath) { - Preconditions.checkNotNull(filePath, "The file path may not be null."); - TextInputFormat format = new TextInputFormat(new Path(filePath)); - TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - - return createInput(format, typeInfo, "Read Text File Source"); + return readTextFile(filePath, "UTF-8"); } /** - * Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link - * java.nio.charset.Charset} with the given name will be used to read the files. + * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such + * line. The {@link java.nio.charset.Charset} with the given name will be used to read the files. + * + *

+ * NOTES ON CHECKPOINTING: The source monitors the path, creates the + * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, + * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint + * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point. * * @param filePath * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") @@ -902,15 +916,32 @@ public abstract class StreamExecutionEnvironment { */ public DataStreamSource readTextFile(String filePath, String charsetName) { Preconditions.checkNotNull(filePath, "The file path may not be null."); + TextInputFormat format = new TextInputFormat(new Path(filePath)); TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); - return createInput(format, typeInfo, "Read Text File Source"); + return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, + FilePathFilter.createDefaultFilter(), typeInfo); } /** - * Reads the given file with the given imput format. + * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. + * + *

+ * Since all data streams need specific information about their types, this method needs to determine the + * type of the data produced by the input format. It will attempt to determine the data type by reflection, + * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. + * In the latter case, this method will invoke the + * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data + * type produced by the input format. + * + *

+ * NOTES ON CHECKPOINTING: The source monitors the path, creates the + * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, + * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint + * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point. * * @param filePath * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") @@ -920,19 +951,64 @@ public abstract class StreamExecutionEnvironment { * The type of the returned data stream * @return The data stream that represents the data read from the given file */ - public DataStreamSource readFile(FileInputFormat inputFormat, String filePath) { - Preconditions.checkNotNull(inputFormat, "InputFormat must not be null."); - Preconditions.checkNotNull(filePath, "The file path must not be null."); + public DataStreamSource readFile(FileInputFormat inputFormat, + String filePath) { + return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter()); + } - inputFormat.setFilePath(new Path(filePath)); + /** + * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending + * on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path + * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and + * exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed, the user + * can specify a custom {@link FilePathFilter}. As a default implementation you can use + * {@link FilePathFilter#createDefaultFilter()}. + * + *

+ * Since all data streams need specific information about their types, this method needs to determine the + * type of the data produced by the input format. It will attempt to determine the data type by reflection, + * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. + * In the latter case, this method will invoke the + * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data + * type produced by the input format. + * + *

+ * NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE}, + * the source monitors the path once, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} + * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers + * are going to be forwarded after the source exits, thus having no checkpoints after that point. + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans + * @param filter + * The files to be excluded from the processing + * @param + * The type of the returned data stream + * @return The data stream that represents the data read from the given file + */ + @PublicEvolving + public DataStreamSource readFile(FileInputFormat inputFormat, + String filePath, + FileProcessingMode watchType, + long interval, + FilePathFilter filter) { + + TypeInformation typeInformation; try { - return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Read File source"); + typeInformation = TypeExtractor.getInputFormatTypes(inputFormat); } catch (Exception e) { - throw new InvalidProgramException("The type returned by the input format could not be automatically " + - "determined. " + - "Please specify the TypeInformation of the produced type explicitly by using the " + - "'createInput(InputFormat, TypeInformation)' method instead."); + throw new InvalidProgramException("The type returned by the input format could not be " + + "automatically determined. Please specify the TypeInformation of the produced type " + + "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); } + return readFile(inputFormat, filePath, watchType, interval, filter, typeInformation); } /** @@ -952,15 +1028,62 @@ public abstract class StreamExecutionEnvironment { * of files. * @return The DataStream containing the given directory. */ + @Deprecated public DataStream readFileStream(String filePath, long intervalMillis, - WatchType watchType) { + FileMonitoringFunction.WatchType watchType) { DataStream> source = addSource(new FileMonitoringFunction( - filePath, intervalMillis, watchType), "Read File Stream source"); + filePath, intervalMillis, watchType), "Read File Stream source"); return source.flatMap(new FileReadFunction()); } /** + * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. + * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) + * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the + * path and exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed, + * the user can specify a custom {@link FilePathFilter}. As a default implementation you can use + * {@link FilePathFilter#createDefaultFilter()}. + * + *

+ * NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE}, + * the source monitors the path once, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} + * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers + * are going to be forwarded after the source exits, thus having no checkpoints after that point. + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit + * @param filter + * The files to be excluded from the processing + * @param typeInformation + * Information on the type of the elements in the output stream + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans + * @param + * The type of the returned data stream + * @return The data stream that represents the data read from the given file + */ + @PublicEvolving + public DataStreamSource readFile(FileInputFormat inputFormat, + String filePath, + FileProcessingMode watchType, + long interval, + FilePathFilter filter, + TypeInformation typeInformation) { + + Preconditions.checkNotNull(inputFormat, "InputFormat must not be null."); + Preconditions.checkNotNull(filePath, "The file path must not be null."); + + inputFormat.setFilePath(filePath); + return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter, interval); + } + + /** * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are * decoded by the system's default character set. On the termination of the socket server connection retries can be * initiated. @@ -1026,12 +1149,20 @@ public abstract class StreamExecutionEnvironment { /** * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. *

- * Since all data streams need specific information about their types, this method needs to determine the type of - * the data produced by the input format. It will attempt to determine the data type by reflection, unless the - * input - * format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. In the latter - * case, this method will invoke the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} - * method to determine data type produced by the input format. + * Since all data streams need specific information about their types, this method needs to determine the + * type of the data produced by the input format. It will attempt to determine the data type by reflection, + * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. + * In the latter case, this method will invoke the + * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data + * type produced by the input format. + * + *

+ * NOTES ON CHECKPOINTING: In the case of a {@link FileInputFormat}, the source + * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the + * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards + * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits, + * without waiting for the readers to finish reading. This implies that no more checkpoint + * barriers are going to be forwarded after the source exits, thus having no checkpoints. * * @param inputFormat * The input format used to create the data stream @@ -1041,35 +1172,84 @@ public abstract class StreamExecutionEnvironment { */ @PublicEvolving public DataStreamSource createInput(InputFormat inputFormat) { - return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source"); + return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); } /** * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. *

- * The data stream is typed to the given TypeInformation. This method is intended for input formats where the - * return - * type cannot be determined by reflection analysis, and that do not implement the + * The data stream is typed to the given TypeInformation. This method is intended for input formats + * where the return type cannot be determined by reflection analysis, and that do not implement the * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. * + *

+ * NOTES ON CHECKPOINTING: In the case of a {@link FileInputFormat}, the source + * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the + * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards + * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits, + * without waiting for the readers to finish reading. This implies that no more checkpoint + * barriers are going to be forwarded after the source exits, thus having no checkpoints. + * * @param inputFormat * The input format used to create the data stream + * @param typeInfo + * The information about the type of the output type * @param * The type of the returned data stream * @return The data stream that represents the data created by the input format */ @PublicEvolving public DataStreamSource createInput(InputFormat inputFormat, TypeInformation typeInfo) { - return createInput(inputFormat, typeInfo, "Custom File source"); + DataStreamSource source; + + if (inputFormat instanceof FileInputFormat) { + FileInputFormat format = (FileInputFormat) inputFormat; + source = createFileInput(format, typeInfo, "Custom File source", + FileProcessingMode.PROCESS_ONCE, + FilePathFilter.createDefaultFilter(), -1); + } else { + source = createInput(inputFormat, typeInfo, "Custom Source"); + } + return source; } - // private helper for passing different names private DataStreamSource createInput(InputFormat inputFormat, - TypeInformation typeInfo, String sourceName) { - FileSourceFunction function = new FileSourceFunction<>(inputFormat, typeInfo); + TypeInformation typeInfo, + String sourceName) { + + InputFormatSource function = new InputFormatSource<>(inputFormat, typeInfo); return addSource(function, sourceName, typeInfo); } + private DataStreamSource createFileInput(FileInputFormat inputFormat, + TypeInformation typeInfo, + String sourceName, + FileProcessingMode watchType, + FilePathFilter pathFilter, + long interval) { + + Preconditions.checkNotNull(inputFormat, "Unspecified file input format."); + Preconditions.checkNotNull(typeInfo, "Unspecified output type information."); + Preconditions.checkNotNull(sourceName, "Unspecified name for the source."); + Preconditions.checkNotNull(watchType, "Unspecified watchtype."); + Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function."); + + Preconditions.checkArgument(watchType.equals(FileProcessingMode.PROCESS_ONCE) || + interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, + "The path monitoring interval cannot be less than 100 ms."); + + ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>( + inputFormat, inputFormat.getFilePath().toString(), + pathFilter, watchType, getParallelism(), interval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(inputFormat); + + SingleOutputStreamOperator source = addSource(monitoringFunction, sourceName) + .transform("FileSplitReader_" + sourceName, typeInfo, reader); + + return new DataStreamSource<>(source); + } + /** * Adds a Data Source to the streaming topology. * http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java new file mode 100644 index 0000000..b97c274 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for + * i) monitoring a user-provided path, ii) deciding which files should be further read and processed, + * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning + * them to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileProcessingMode} and the {@link FilePathFilter}. + * The splits of the files to be read are then forwarded to the downstream + * {@link ContinuousFileReaderOperator} which can have parallelism greater than one. + */ +@Internal +public class ContinuousFileMonitoringFunction + extends RichSourceFunction implements Checkpointed>>, Tuple2>, Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); + + /** + * The minimum interval allowed between consecutive path scans. This is applicable if the + * {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}. + */ + public static final long MIN_MONITORING_INTERVAL = 100l; + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link FileProcessingMode}. */ + private final FileProcessingMode watchType; + + private List>> splitsToFwdOrderedAscByModTime; + + private Tuple2> currentSplitsToFwd; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public ContinuousFileMonitoringFunction( + FileInputFormat format, String path, + FilePathFilter filter, FileProcessingMode watchType, + int readerParallelism, long interval) { + + if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) { + throw new IllegalArgumentException("The specified monitoring interval (" + interval + " ms) is " + + "smaller than the minimum allowed one (100 ms)."); + } + this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format."); + this.path = Preconditions.checkNotNull(path, "Unspecified Path."); + this.pathFilter = Preconditions.checkNotNull(filter, "Unspecified File Path Filter."); + + this.interval = interval; + this.watchType = watchType; + this.readerParallelism = Math.max(readerParallelism, 1); + this.globalModificationTime = Long.MIN_VALUE; + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + LOG.info("Opening File Monitoring Source."); + + super.open(parameters); + format.configure(parameters); + } + + @Override + public void run(SourceFunction.SourceContext context) throws Exception { + FileSystem fileSystem = FileSystem.get(new URI(path)); + + switch (watchType) { + case PROCESS_CONTINUOUSLY: + while (isRunning) { + monitorDirAndForwardSplits(fileSystem, context); + Thread.sleep(interval); + } + isRunning = false; + break; + case PROCESS_ONCE: + monitorDirAndForwardSplits(fileSystem, context); + isRunning = false; + break; + default: + isRunning = false; + throw new RuntimeException("Unknown WatchType" + watchType); + } + } + + private void monitorDirAndForwardSplits(FileSystem fs, SourceContext context) throws IOException, JobException { + final Object lock = context.getCheckpointLock(); + + // it may be non-null in the case of a recovery after a failure. + if (currentSplitsToFwd != null) { + synchronized (lock) { + forwardSplits(currentSplitsToFwd, context); + } + } + currentSplitsToFwd = null; + + // it may be non-null in the case of a recovery after a failure. + if (splitsToFwdOrderedAscByModTime == null) { + splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs); + } + + Iterator>> it = + splitsToFwdOrderedAscByModTime.iterator(); + + while (it.hasNext()) { + synchronized (lock) { + currentSplitsToFwd = it.next(); + it.remove(); + forwardSplits(currentSplitsToFwd, context); + } + } + + // set them to null to distinguish from a restore. + splitsToFwdOrderedAscByModTime = null; + currentSplitsToFwd = null; + } + + private void forwardSplits(Tuple2> splitsToFwd, SourceContext context) { + currentSplitsToFwd = splitsToFwd; + Long modTime = currentSplitsToFwd.f0; + List splits = currentSplitsToFwd.f1; + + Iterator it = splits.iterator(); + while (it.hasNext()) { + FileInputSplit split = it.next(); + processSplit(split, context); + it.remove(); + } + + // update the global modification time + if (modTime >= globalModificationTime) { + globalModificationTime = modTime; + } + } + + private void processSplit(FileInputSplit split, SourceContext context) { + LOG.info("Forwarding split: " + split); + context.collect(split); + } + + private List>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException { + List eligibleFiles = listEligibleFiles(fileSystem); + if (eligibleFiles.isEmpty()) { + return new ArrayList<>(); + } + + Map> splitsToForward = getInputSplits(eligibleFiles); + List>> sortedSplitsToForward = new ArrayList<>(); + + for (Map.Entry> entry : splitsToForward.entrySet()) { + sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), entry.getValue())); + } + + Collections.sort(sortedSplitsToForward, new Comparator>>() { + @Override + public int compare(Tuple2> o1, Tuple2> o2) { + return (int) (o1.f0 - o2.f0); + } + }); + + return sortedSplitsToForward; + } + + /** + * Creates the input splits for the path to be forwarded to the downstream tasks of the + * {@link ContinuousFileReaderOperator}. Those tasks are going to read their contents for further + * processing. Splits belonging to files in the {@code eligibleFiles} list are the ones + * that are shipped for further processing. + * @param eligibleFiles The files to process. + */ + private Map> getInputSplits(List eligibleFiles) throws IOException { + if (eligibleFiles.isEmpty()) { + return new HashMap<>(); + } + + FileInputSplit[] inputSplits = format.createInputSplits(readerParallelism); + + Map> splitsPerFile = new HashMap<>(); + for (FileInputSplit split: inputSplits) { + for (FileStatus file: eligibleFiles) { + if (file.getPath().equals(split.getPath())) { + Long modTime = file.getModificationTime(); + + List splitsToForward = splitsPerFile.get(modTime); + if (splitsToForward == null) { + splitsToForward = new LinkedList<>(); + splitsPerFile.put(modTime, splitsToForward); + } + splitsToForward.add(split); + break; + } + } + } + return splitsPerFile; + } + + /** + * Returns the files that have data to be processed. This method returns the + * Paths to the aforementioned files. It is up to the {@link #processSplit(FileInputSplit, SourceContext)} + * method to decide which parts of the file to be processed, and forward them downstream. + */ + private List listEligibleFiles(FileSystem fileSystem) throws IOException { + List files = new ArrayList<>(); + + FileStatus[] statuses = fileSystem.listStatus(new Path(path)); + if (statuses == null) { + LOG.warn("Path does not exist: {}", path); + } else { + // handle the new files + for (FileStatus status : statuses) { + Path filePath = status.getPath(); + long modificationTime = status.getModificationTime(); + if (!shouldIgnore(filePath, modificationTime)) { + files.add(status); + } + } + } + return files; + } + + /** + * Returns {@code true} if the file is NOT to be processed further. + * This happens in the following cases: + * + * If the user-specified path filtering method returns {@code true} for the file, + * or if the modification time of the file is smaller than the {@link #globalModificationTime}, which + * is the time of the most recent modification found in any of the already processed files. + */ + private boolean shouldIgnore(Path filePath, long modificationTime) { + boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime); + if (shouldIgnore) { + LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime); + } + return shouldIgnore; + } + + @Override + public void close() throws Exception { + super.close(); + isRunning = false; + LOG.info("Closed File Monitoring Source."); + } + + @Override + public void cancel() { + isRunning = false; + } + + // --------------------- Checkpointing -------------------------- + + @Override + public Tuple3>>, Tuple2>, Long> snapshotState( + long checkpointId, long checkpointTimestamp) throws Exception { + + if (!isRunning) { + LOG.debug("snapshotState() called on closed source"); + return null; + } + return new Tuple3<>(splitsToFwdOrderedAscByModTime, + currentSplitsToFwd, globalModificationTime); + } + + @Override + public void restoreState(Tuple3>>, + Tuple2>, Long> state) throws Exception { + + this.splitsToFwdOrderedAscByModTime = state.f0; + this.currentSplitsToFwd = state.f1; + this.globalModificationTime = state.f2; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java new file mode 100644 index 0000000..4d4a792 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from + * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors + * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another + * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from + * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()} + * so that the checkpoints reflect the current state. + */ +@Internal +public class ContinuousFileReaderOperator extends AbstractStreamOperator + implements OneInputStreamOperator, OutputTypeConfigurable { + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); + + private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null); + + private transient SplitReader reader; + private transient TimestampedCollector collector; + + private FileInputFormat format; + private TypeSerializer serializer; + + private Object checkpointLock; + + private Tuple3, FileInputSplit, S> readerState; + + public ContinuousFileReaderOperator(FileInputFormat format) { + this.format = checkNotNull(format); + } + + @Override + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + this.serializer = outTypeInfo.createSerializer(executionConfig); + } + + @Override + public void open() throws Exception { + super.open(); + + if (this.serializer == null) { + throw new IllegalStateException("The serializer has not been set. " + + "Probably the setOutputType() was not called and this should not have happened. " + + "Please report it."); + } + + this.format.configure(new Configuration()); + this.collector = new TimestampedCollector<>(output); + this.checkpointLock = getContainingTask().getCheckpointLock(); + + this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState); + this.reader.start(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + reader.addSplit(element.getValue()); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + @Override + public void dispose() { + super.dispose(); + + // first try to cancel it properly and + // give it some time until it finishes + reader.cancel(); + try { + reader.join(200); + } catch (InterruptedException e) { + // we can ignore this + } + + // if the above did not work, then interrupt the thread repeatedly + while (reader.isAlive()) { + + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = reader.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } + LOG.warn("The reader is stuck in method:\n {}", bld.toString()); + + reader.interrupt(); + try { + reader.join(50); + } catch (InterruptedException e) { + // we can ignore this + } + } + reader = null; + collector = null; + format = null; + serializer = null; + } + + @Override + public void close() throws Exception { + super.close(); + + // signal that no more splits will come, wait for the reader to finish + // and close the collector. Further cleaning up is handled by the dispose(). + + if (reader != null && reader.isAlive() && reader.isRunning()) { + // add a dummy element to signal that no more splits will + // arrive and wait until the reader finishes + reader.addSplit(EOS); + + // we already have the checkpoint lock because close() is + // called by the StreamTask while having it. + checkpointLock.wait(); + } + collector.close(); + } + + private class SplitReader extends Thread { + + private volatile boolean isRunning; + + private final FileInputFormat format; + private final TypeSerializer serializer; + + private final Object checkpointLock; + private final TimestampedCollector collector; + + private final Queue pendingSplits; + + private FileInputSplit currentSplit = null; + + private S restoredFormatState = null; + + SplitReader(FileInputFormat format, + TypeSerializer serializer, + TimestampedCollector collector, + Object checkpointLock, + Tuple3, FileInputSplit, S> restoredState) { + + this.format = checkNotNull(format, "Unspecified FileInputFormat."); + this.serializer = checkNotNull(serializer, "Unspecified Serialized."); + + this.pendingSplits = new LinkedList<>(); + this.collector = collector; + this.checkpointLock = checkpointLock; + this.isRunning = true; + + // this is the case where a task recovers from a previous failed attempt + if (restoredState != null) { + List pending = restoredState.f0; + FileInputSplit current = restoredState.f1; + S formatState = restoredState.f2; + + for (FileInputSplit split : pending) { + pendingSplits.add(split); + } + + this.currentSplit = current; + this.restoredFormatState = formatState; + } + ContinuousFileReaderOperator.this.readerState = null; + } + + void addSplit(FileInputSplit split) { + Preconditions.checkNotNull(split); + synchronized (checkpointLock) { + this.pendingSplits.add(split); + } + } + + public boolean isRunning() { + return this.isRunning; + } + + @Override + public void run() { + try { + while (this.isRunning) { + + synchronized (checkpointLock) { + if (this.currentSplit != null) { + + if (currentSplit.equals(EOS)) { + isRunning = false; + break; + } + + if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) { + ((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState); + } else { + // this is the case of a non-checkpointable input format that will reprocess the last split. + LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable."); + format.open(currentSplit); + } + // reset the restored state to null for the next iteration + this.restoredFormatState = null; + } else { + + // get the next split to read. + currentSplit = this.pendingSplits.poll(); + + if (currentSplit == null) { + checkpointLock.wait(50); + continue; + } + + if (currentSplit.equals(EOS)) { + isRunning = false; + break; + } + this.format.open(currentSplit); + } + } + + LOG.info("Reading split: " + currentSplit); + + try { + OT nextElement = serializer.createInstance(); + while (!format.reachedEnd()) { + synchronized (checkpointLock) { + nextElement = format.nextRecord(nextElement); + if (nextElement != null) { + collector.collect(nextElement); + } else { + break; + } + } + } + + } finally { + // close and prepare for the next iteration + this.format.close(); + this.currentSplit = null; + } + } + + } catch (Throwable e) { + if (isRunning) { + LOG.error("Caught exception processing split: ", currentSplit); + } + getContainingTask().failExternally(e); + } finally { + synchronized (checkpointLock) { + LOG.info("Reader terminated, and exiting..."); + checkpointLock.notifyAll(); + } + } + } + + Tuple3, FileInputSplit, S> getReaderState() throws IOException { + List snapshot = new ArrayList<>(this.pendingSplits.size()); + for (FileInputSplit split: this.pendingSplits) { + snapshot.add(split); + } + + // remove the current split from the list if inside. + if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) { + this.pendingSplits.remove(); + } + + if (this.format instanceof CheckpointableInputFormat) { + S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState(); + return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState); + } else { + LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery."); + return new Tuple3<>(snapshot, currentSplit, null); + } + } + + public void cancel() { + this.isRunning = false; + } + } + + // --------------------- Checkpointing -------------------------- + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + + final AbstractStateBackend.CheckpointStateOutputStream os = + this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp); + + final ObjectOutputStream oos = new ObjectOutputStream(os); + final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os); + + Tuple3, FileInputSplit, S> readerState = this.reader.getReaderState(); + List pendingSplits = readerState.f0; + FileInputSplit currSplit = readerState.f1; + S formatState = readerState.f2; + + // write the current split + oos.writeObject(currSplit); + + // write the pending ones + ov.writeInt(pendingSplits.size()); + for (FileInputSplit split : pendingSplits) { + oos.writeObject(split); + } + + // write the state of the reading channel + oos.writeObject(formatState); + taskState.setOperatorState(os.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { + super.restoreState(state, recoveryTimestamp); + + StreamStateHandle stream = (StreamStateHandle) state.getOperatorState(); + + final InputStream is = stream.getState(getUserCodeClassloader()); + final ObjectInputStream ois = new ObjectInputStream(is); + final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is); + + // read the split that was being read + FileInputSplit currSplit = (FileInputSplit) ois.readObject(); + + // read the pending splits list + List pendingSplits = new LinkedList<>(); + int noOfSplits = div.readInt(); + for (int i = 0; i < noOfSplits; i++) { + FileInputSplit split = (FileInputSplit) ois.readObject(); + pendingSplits.add(split); + } + + // read the state of the format + S formatState = (S) ois.readObject(); + + // set the whole reader state for the open() to find. + this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState); + div.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java index fc24079..06da8c1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.functions.source; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; @@ -32,7 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -@PublicEvolving +@Deprecated public class FileMonitoringFunction implements SourceFunction> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java new file mode 100644 index 0000000..1a359ab --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +/** + * An interface to be implemented by the user when using the {@link ContinuousFileMonitoringFunction}. + * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further + * processing or not. This can serve to exclude temporary or partial files that + * are still being written. + */ +@PublicEvolving +public abstract class FilePathFilter implements Serializable { + + public static FilePathFilter createDefaultFilter() { + return new DefaultFilter(); + } + /** + * Returns {@code true} if the {@code filePath} given is to be + * ignored when processing a directory, e.g. + *

+	 * {@code
+	 *
+	 * public boolean filterPaths(Path filePath) {
+	 *     return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
+	 * }
+	 * }
+ */ + public abstract boolean filterPath(Path filePath); + + /** + * The default file path filtering method and is used + * if no other such function is provided. This filter leaves out + * files starting with ".", "_", and "_COPYING_". + */ + public static class DefaultFilter extends FilePathFilter { + + DefaultFilter() {} + + @Override + public boolean filterPath(Path filePath) { + return filePath == null || + filePath.getName().startsWith(".") || + filePath.getName().startsWith("_") || + filePath.getName().contains("_COPYING_"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java new file mode 100644 index 0000000..cdbeb2b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Specifies when the computation of the {@link ContinuousFileMonitoringFunction} + * will be triggered. + */ +@PublicEvolving +public enum FileProcessingMode { + + PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. + PROCESS_CONTINUOUSLY // Reprocesses the whole file when new data is appended. +} http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java index 0f78826..ac1e834 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java @@ -21,7 +21,6 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FSDataInputStream; @@ -29,7 +28,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; -@PublicEvolving +@Deprecated public class FileReadFunction implements FlatMapFunction, String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java deleted file mode 100644 index 0dcb9ff..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.source; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; - -import java.util.Iterator; -import java.util.NoSuchElementException; - -@PublicEvolving -public class FileSourceFunction extends RichParallelSourceFunction { - private static final long serialVersionUID = 1L; - - private TypeInformation typeInfo; - private transient TypeSerializer serializer; - - private InputFormat format; - - private transient InputSplitProvider provider; - private transient Iterator splitIterator; - - private volatile boolean isRunning = true; - - @SuppressWarnings("unchecked") - public FileSourceFunction(InputFormat format, TypeInformation typeInfo) { - this.format = (InputFormat) format; - this.typeInfo = typeInfo; - } - - @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - this.provider = context.getInputSplitProvider(); - - format.configure(parameters); - serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); - - splitIterator = getInputSplits(); - if (splitIterator.hasNext()) { - format.open(splitIterator.next()); - } - isRunning = true; - } - - @Override - public void close() throws Exception { - format.close(); - } - - private Iterator getInputSplits() { - - return new Iterator() { - - private InputSplit nextSplit; - - private boolean exhausted; - - @Override - public boolean hasNext() { - if (exhausted) { - return false; - } - - if (nextSplit != null) { - return true; - } - - InputSplit split = provider.getNextInputSplit(); - - if (split != null) { - this.nextSplit = split; - return true; - } else { - exhausted = true; - return false; - } - } - - @Override - public InputSplit next() { - if (this.nextSplit == null && !hasNext()) { - throw new NoSuchElementException(); - } - - final InputSplit tmp = this.nextSplit; - this.nextSplit = null; - return tmp; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void run(SourceContext ctx) throws Exception { - while (isRunning) { - OUT nextElement = serializer.createInstance(); - nextElement = format.nextRecord(nextElement); - if (nextElement == null && splitIterator.hasNext()) { - format.open(splitIterator.next()); - continue; - } else if (nextElement == null) { - break; - } - ctx.collect(nextElement); - } - } - - @Override - public void cancel() { - isRunning = false; - } - - - /** - * Returns the {@code InputFormat}. This is only needed because we need to set the input - * split assigner on the {@code StreamGraph}. - */ - public InputFormat getFormat() { - return format; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java new file mode 100644 index 0000000..2a84781 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +@Internal +public class InputFormatSource extends RichParallelSourceFunction { + private static final long serialVersionUID = 1L; + + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + private InputFormat format; + + private transient InputSplitProvider provider; + private transient Iterator splitIterator; + + private volatile boolean isRunning = true; + + @SuppressWarnings("unchecked") + public InputFormatSource(InputFormat format, TypeInformation typeInfo) { + this.format = (InputFormat) format; + this.typeInfo = typeInfo; + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + this.provider = context.getInputSplitProvider(); + + format.configure(parameters); + serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + splitIterator = getInputSplits(); + if (splitIterator.hasNext()) { + format.open(splitIterator.next()); + } + isRunning = true; + } + + @Override + public void close() throws Exception { + format.close(); + } + + private Iterator getInputSplits() { + + return new Iterator() { + + private InputSplit nextSplit; + + private boolean exhausted; + + @Override + public boolean hasNext() { + if (exhausted) { + return false; + } + + if (nextSplit != null) { + return true; + } + + InputSplit split = provider.getNextInputSplit(); + + if (split != null) { + this.nextSplit = split; + return true; + } else { + exhausted = true; + return false; + } + } + + @Override + public InputSplit next() { + if (this.nextSplit == null && !hasNext()) { + throw new NoSuchElementException(); + } + + final InputSplit tmp = this.nextSplit; + this.nextSplit = null; + return tmp; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (isRunning) { + OUT nextElement = serializer.createInstance(); + nextElement = format.nextRecord(nextElement); + if (nextElement == null && splitIterator.hasNext()) { + format.open(splitIterator.next()); + continue; + } else if (nextElement == null) { + break; + } + ctx.collect(nextElement); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + + /** + * Returns the {@code InputFormat}. This is only needed because we need to set the input + * split assigner on the {@code StreamGraph}. + */ + public InputFormat getFormat() { + return format; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 70c5cff..685655e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.InputFormatSource; import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; import org.apache.flink.streaming.api.transformations.FeedbackTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -425,8 +425,8 @@ public class StreamGraphGenerator { null, source.getOutputType(), "Source: " + source.getName()); - if (source.getOperator().getUserFunction() instanceof FileSourceFunction) { - FileSourceFunction fs = (FileSourceFunction) source.getOperator().getUserFunction(); + if (source.getOperator().getUserFunction() instanceof InputFormatSource) { + InputFormatSource fs = (InputFormatSource) source.getOperator().getUserFunction(); streamGraph.setInputFormat(source.getId(), fs.getFormat()); } streamGraph.setParallelism(source.getId(), source.getParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java index 86677a6..4517eea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java @@ -32,7 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; public interface OutputTypeConfigurable { /** - * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)} + * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, String, StreamOperator, TypeInformation, TypeInformation, String)} * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The * method is called with the output {@link TypeInformation} which is also used for the * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer. http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 69e920f..9ed715e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -69,7 +69,7 @@ public interface StreamOperator extends Serializable { *

* The method is expected to flush all remaining buffered data. Exceptions during this flushing - * of buffered should be propagated, in order to cause the operation to be recognized asa failed, + * of buffered should be propagated, in order to cause the operation to be recognized as failed, * because the last data items are not processed properly. * * @throws java.lang.Exception An exception in this method causes the operator to fail. http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index c5f983a..6e2e9f9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -141,6 +141,10 @@ public class OneInputStreamOperatorTestHarness { }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class)); } + public Object getCheckpointLock() { + return mockTask.getCheckpointLock(); + } + public void configureForKeyedStream(KeySelector keySelector, TypeInformation keyType) { ClosureCleaner.clean(keySelector, false); config.setStatePartitioner(0, keySelector); http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 7be7840..f6dab1e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -19,8 +19,7 @@ package org.apache.flink.streaming.api.scala import com.esotericsoftware.kryo.Serializer - -import org.apache.flink.annotation.{Internal, PublicEvolving, Public} +import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation @@ -28,14 +27,12 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.runtime.state.AbstractStateBackend import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} -import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType -import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source._ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.util.SplittableIterator import scala.collection.JavaConverters._ - import _root_.scala.language.implicitConversions @Public @@ -454,20 +451,67 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { DataStream[T] = asScalaStream(javaEnv.readFile(inputFormat, filePath)) - /** - * Creates a DataStream that contains the contents of file created while - * system watches the given path. The file will be read with the system's - * default character set. The user can check the monitoring interval in milliseconds, - * and the way file modifications are handled. By default it checks for only new files - * every 100 milliseconds. - * - */ - def readFileStream(StreamPath: String, intervalMillis: Long = 100, - watchType: WatchType = WatchType.ONLY_NEW_FILES): DataStream[String] = + * Creates a DataStream that contains the contents of file created while + * system watches the given path. The file will be read with the system's + * default character set. The user can check the monitoring interval in milliseconds, + * and the way file modifications are handled. By default it checks for only new files + * every 100 milliseconds. + * + */ + @Deprecated + def readFileStream(StreamPath: String, intervalMillis: Long = 100, + watchType: FileMonitoringFunction.WatchType = + FileMonitoringFunction.WatchType.ONLY_NEW_FILES): DataStream[String] = asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType)) /** + * Reads the contents of the user-specified path based on the given [[FileInputFormat]]. + * Depending on the provided [[FileProcessingMode]], the source + * may periodically monitor (every `interval` ms) the path for new data + * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process + * once the data currently in the path and exit + * ([[FileProcessingMode.PROCESS_ONCE]]). In addition, + * if the path contains files not to be processed, the user can specify a custom + * [[FilePathFilter]]. As a default implementation you can use + * [[FilePathFilter.createDefaultFilter()]]. + * + * ** NOTES ON CHECKPOINTING: ** If the `watchType` is set to + * [[FileProcessingMode#PROCESS_ONCE]], the source monitors the path ** once **, + * creates the [[org.apache.flink.core.fs.FileInputSplit FileInputSplits]] + * to be processed, forwards them to the downstream + * [[ContinuousFileReaderOperator readers]] to read the actual data, + * and exits, without waiting for the readers to finish reading. This + * implies that no more checkpoint barriers are going to be forwarded + * after the source exits, thus having no checkpoints after that point. + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or + * "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react + * to new data, or process once and exit + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) + * between consecutive path scans + * @param filter + * The files to be excluded from the processing + * @return The data stream that represents the data read from the given file + */ + @PublicEvolving + def readFile[T: TypeInformation]( + inputFormat: FileInputFormat[T], + filePath: String, + watchType: FileProcessingMode, + interval: Long, + filter: FilePathFilter): DataStream[T] = { + val typeInfo = implicitly[TypeInformation[T]] + asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter, typeInfo)) + } + + /** * Creates a new DataStream that contains the strings received infinitely * from socket. Received strings are decoded by the system's default * character set. The maximum retry interval is specified in seconds, in case http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java new file mode 100644 index 0000000..4c0f648 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.FilePathFilter; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase { + + private static final int NO_OF_FILES = 9; + private static final int LINES_PER_FILE = 200; + private static final int NO_OF_RETRIES = 3; + private static final int PARALLELISM = 4; + private static final long INTERVAL = 2000; + + private static File baseDir; + private static org.apache.hadoop.fs.FileSystem fs; + private static String localFsURI; + private FileCreator fc; + + private static Map> finalCollectedContent = new HashMap<>(); + + @BeforeClass + public static void createHDFS() { + try { + baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + + localFsURI = "file:///" + baseDir +"/"; + fs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @AfterClass + public static void destroyHDFS() { + try { + FileUtil.fullyDelete(baseDir); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + @Override + public void testProgram(StreamExecutionEnvironment env) { + + // set the restart strategy. + env.getConfig().setRestartStrategy( + RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0)); + env.enableCheckpointing(20); + env.setParallelism(PARALLELISM); + + // create and start the file creating thread. + fc = new FileCreator(); + fc.start(); + + // create the monitoring source along with the necessary readers. + TestingSinkFunction sink = new TestingSinkFunction(); + TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI)); + DataStream inputStream = env.readFile(format, localFsURI, + FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, FilePathFilter.createDefaultFilter()); + + inputStream.flatMap(new FlatMapFunction() { + @Override + public void flatMap(String value, Collector out) throws Exception { + out.collect(value); + } + }).addSink(sink).setParallelism(1); + } + + @Override + public void postSubmit() throws Exception { + Map> collected = finalCollectedContent; + Assert.assertEquals(collected.size(), fc.getFileContent().size()); + for (Integer fileIdx: fc.getFileContent().keySet()) { + Assert.assertTrue(collected.keySet().contains(fileIdx)); + + List cntnt = collected.get(fileIdx); + Collections.sort(cntnt, new Comparator() { + @Override + public int compare(String o1, String o2) { + return getLineNo(o1) - getLineNo(o2); + } + }); + + StringBuilder cntntStr = new StringBuilder(); + for (String line: cntnt) { + cntntStr.append(line); + } + Assert.assertEquals(fc.getFileContent().get(fileIdx), cntntStr.toString()); + } + + collected.clear(); + finalCollectedContent.clear(); + fc.clean(); + } + + private int getLineNo(String line) { + String[] tkns = line.split("\\s"); + Assert.assertTrue(tkns.length == 6); + return Integer.parseInt(tkns[tkns.length - 1]); + } + + // -------------------------------------------------------------------------------------------- + // Custom Functions + // -------------------------------------------------------------------------------------------- + + // ------------------------- FILE CREATION ------------------------------- + + /** + * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds. + * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}. + * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method. + * */ + private class FileCreator extends Thread { + + private final Set filesCreated = new HashSet<>(); + private final Map fileContents = new HashMap<>(); + + public void run() { + try { + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = + fillWithData(localFsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + fileContents.put(i, file.f1); + + Thread.sleep((int) (INTERVAL / (3.0/2))); + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + void clean() throws IOException { + assert (fs != null); + for (org.apache.hadoop.fs.Path path: filesCreated) { + fs.delete(path, false); + } + fileContents.clear(); + } + + Map getFileContent() { + return this.fileContents; + } + } + + /** + * Fill the file with content and put the content in the {@code hdPathContents} list. + * */ + private Tuple2 fillWithData( + String base, String fileName, int fileIdx, String sampleLine) throws IOException { + + assert (fs != null); + + org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx); + + org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); + FSDataOutputStream stream = fs.create(tmp); + StringBuilder str = new StringBuilder(); + for(int i = 0; i < LINES_PER_FILE; i++) { + String line = fileIdx +": "+ sampleLine + " " + i +"\n"; + str.append(line); + stream.write(line.getBytes()); + } + stream.close(); + fs.rename(tmp, file); + Assert.assertTrue("No result file present", fs.exists(file)); + return new Tuple2<>(file, str.toString()); + } + + // -------------------------- Task Sink ------------------------------ + + private static class TestingSinkFunction extends RichSinkFunction + implements Checkpointed>>>, CheckpointListener { + + private static volatile boolean hasFailed = false; + + private volatile int numSuccessfulCheckpoints; + + private long count; + + private long elementsToFailure; + + private long elementCounter = 0; + + private Map> collectedContent = new HashMap<>(); + + TestingSinkFunction() { + hasFailed = false; + } + + @Override + public void open(Configuration parameters) throws Exception { + // this sink can only work with DOP 1 + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + + long failurePosMin = (long) (0.4 * LINES_PER_FILE); + long failurePosMax = (long) (0.7 * LINES_PER_FILE); + + elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + + if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { + finalCollectedContent = new HashMap<>(); + for (Map.Entry> result: collectedContent.entrySet()) { + finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); + } + throw new SuccessException(); + } + } + + @Override + public void close() { + try { + super.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void invoke(String value) throws Exception { + int fileIdx = Character.getNumericValue(value.charAt(0)); + + Set content = collectedContent.get(fileIdx); + if (content == null) { + content = new HashSet<>(); + collectedContent.put(fileIdx, content); + } + + if (!content.add(value + "\n")) { + fail("Duplicate line: " + value); + System.exit(0); + } + + + elementCounter++; + if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { + finalCollectedContent = new HashMap<>(); + for (Map.Entry> result: collectedContent.entrySet()) { + finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); + } + throw new SuccessException(); + } + + count++; + if (!hasFailed) { + Thread.sleep(2); + if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) { + hasFailed = true; + throw new Exception("Task Failure"); + } + } + } + + @Override + public Tuple2>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return new Tuple2<>(elementCounter, collectedContent); + } + + @Override + public void restoreState(Tuple2>> state) throws Exception { + this.elementCounter = state.f0; + this.collectedContent = state.f1; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + numSuccessfulCheckpoints++; + } + } +}