flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [hotfix] Fix JavaDocs for SourceFunction
Date Thu, 03 Mar 2016 10:22:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master 072da7def -> 7fbfab64f


[hotfix] Fix JavaDocs for SourceFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fbfab64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fbfab64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fbfab64

Branch: refs/heads/master
Commit: 7fbfab64f062301f3e8288d040d41ba3529c105d
Parents: 072da7d
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Mar 3 11:13:13 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Mar 3 11:13:13 2016 +0100

----------------------------------------------------------------------
 .../api/functions/source/SourceFunction.java    | 172 +++++++++++++------
 1 file changed, 119 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fbfab64/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 1f4e1e4..f1619b2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -21,36 +21,33 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.TimestampAssigner;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
 import java.io.Serializable;
 
 /**
  * Base interface for all stream data sources in Flink. The contract of a stream source
- * is the following: When the source should start emitting elements the {@link #run} method
- * is called with a {@link org.apache.flink.util.Collector} that can be used for emitting
elements.
+ * is the following: When the source should start emitting elements, the {@link #run} method
+ * is called with a {@link SourceContext} that can be used for emitting elements.
  * The run method can run for as long as necessary. The source must, however, react to an
- * invocation of {@link #cancel} by breaking out of its main loop.
+ * invocation of {@link #cancel()} by breaking out of its main loop.
  *
- * <p>
- * <b>Note about checkpointed sources</b> <br>
+ * <h3>Checkpointed Sources</h3>
  *
- * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
+ * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
  * interface must ensure that state checkpointing, updating of internal state and emission
of
  * elements are not done concurrently. This is achieved by using the provided checkpointing
lock
  * object to protect update of state and emission of elements in a synchronized block.
- * </p>
  *
- * <p>
- * This is the basic pattern one should follow when implementing a (checkpointed) source:
- * </p>
+ * <p>This is the basic pattern one should follow when implementing a (checkpointed)
source:
  *
  * <pre>{@code
  *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long>
{
  *      private long count = 0L;
  *      private volatile boolean isRunning = true;
  *
- *      {@literal @}Override
  *      public void run(SourceContext<T> ctx) {
  *          while (isRunning && count < 1000) {
  *              synchronized (ctx.getCheckpointLock()) {
@@ -60,58 +57,102 @@ import java.io.Serializable;
  *          }
  *      }
  *
- *      {@literal @}Override
  *      public void cancel() {
  *          isRunning = false;
  *      }
  *
- *      {@literal @}Override
  *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count;
}
  *
- *      {@literal @}Override
  *      public void restoreState(Long state) { this.count = state; }
  * }
  * }</pre>
  *
  *
- * <p>
- * <b>Note about element timestamps and watermarks:</b> <br>
- * Sources must only manually emit watermarks when they implement
- * {@link EventTimeSourceFunction }.
- * Otherwise, elements automatically get the current timestamp assigned at ingress
- * and the system automatically emits watermarks.
+ * <h3>Timestamps and watermarks:</h3>
+ * 
+ * Sources may assign timestamps to elements and may manually emit watermarks.
+ * However, these are only interpreted if the streaming program runs on 
+ * {@link TimeCharacteristic#EventTime}. On other time characteristics 
+ * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
+ * the watermarks from the source function are ignored.
  *
+ * <h3>Gracefully Stopping Functions</h3>
+ * 
+ * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
+ * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that
leaves the
+ * state and the emitted elements in a consistent state.
+ * 
+ * <p>When a source is stopped, the executing thread is not interrupted, but expected
to leave the
+ * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
+ * of state updates and element emission. 
+ * 
  * @param <T> The type of the elements produced by this source.
+ * 
+ * @see org.apache.flink.api.common.functions.StoppableFunction
+ * @see org.apache.flink.streaming.api.TimeCharacteristic
  */
 @Public
 public interface SourceFunction<T> extends Function, Serializable {
 
 	/**
-	 * Starts the source. You can use the {@link org.apache.flink.util.Collector} parameter
to emit
-	 * elements. Sources that implement
-	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} must lock on the
-	 * checkpoint lock (using a synchronized block) before updating internal state and/or emitting
-	 * elements. Also, the update of state and emission of elements must happen in the same
-	 * synchronized block.
+	 * Starts the source. Implementations can use the {@link SourceContext} emit
+	 * elements.
+	 * 
+	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
+	 * must lock on the checkpoint lock (using a synchronized block) before updating internal
+	 * state and emitting elements, to make both an atomic operation:
 	 *
-	 * @param ctx The context for interaction with the outside world.
+	 * <pre>{@code
+	 *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long>
{
+	 *      private long count = 0L;
+	 *      private volatile boolean isRunning = true;
+	 *
+	 *      public void run(SourceContext<T> ctx) {
+	 *          while (isRunning && count < 1000) {
+	 *              synchronized (ctx.getCheckpointLock()) {
+	 *                  ctx.collect(count);
+	 *                  count++;
+	 *              }
+	 *          }
+	 *      }
+	 *
+	 *      public void cancel() {
+	 *          isRunning = false;
+	 *      }
+	 *
+	 *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return
count; }
+	 *
+	 *      public void restoreState(Long state) { this.count = state; }
+	 * }
+	 * }</pre>
+	 *
+	 * @param ctx The context to emit elements to and for accessing locks.
 	 */
 	void run(SourceContext<T> ctx) throws Exception;
 
 	/**
 	 * Cancels the source. Most sources will have a while loop inside the
-	 * {@link #run} method. You need to ensure that the source will break out of this loop.
This
-	 * can be achieved by having a volatile field "isRunning" that is checked in the loop and
that
-	 * is set to false in this method.
+	 * {@link #run(SourceContext)} method. The implementation needs to ensure that the
+	 * source will break out of that loop after this method is called.
+	 * 
+	 * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that
is set to
+	 * {@code false} in this method. That flag is checked in the loop condition.
+	 * 
+	 * <p>When a source is canceled, the executing thread will also be interrupted
+	 * (via {@link Thread#interrupt()}). The interruption happens strictly after this
+	 * method has been called, so any interruption handler can rely on the fact that
+	 * this method has completed. It is good practice to make any flags altered by
+	 * this method "volatile", in order to guarantee the visibility of the effects of
+	 * this method to any interruption handler.
 	 */
 	void cancel();
 
+	// ------------------------------------------------------------------------
+	//  source context
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Interface that source functions use to communicate with the outside world. Normally
-	 * sources would just emit elements in a loop using {@link #collect}. If the source is a
-	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} source it must retrieve
-	 * the checkpoint lock object and use it to protect state updates and element emission as
-	 * described in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
+	 * Interface that source functions use to emit elements, and possibly watermarks.
 	 *
 	 * @param <T> The type of the elements produced by the source.
 	 */
@@ -119,49 +160,74 @@ public interface SourceFunction<T> extends Function, Serializable
{
 	interface SourceContext<T> {
 
 		/**
-		 * Emits one element from the source. The result of {@link System#currentTimeMillis()}
is set as
-		 * the timestamp of the emitted element.
+		 * Emits one element from the source, without attaching a timestamp. In most cases,
+		 * this is the default way of emitting elements.
+		 * 
+		 * <p>The timestamp that the element will get assigned depends on the time characteristic
of
+		 * the streaming program:
+		 * <ul>
+		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
+		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
+		 *         current time as the timestamp.</li>
+		 *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp
initially.
+		 *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
+		 *         operation (like time windows).</li>
+		 * </ul>
 		 *
 		 * @param element The element to emit
 		 */
 		void collect(T element);
 
 		/**
-		 * Emits one element from the source with the given timestamp.
+		 * Emits one element from the source, and attaches the given timestamp. This method
+		 * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
+		 * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
+		 * on the stream.
+		 *
+		 * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
+		 * This allows programs to switch between the different time characteristics and behaviors
+		 * without changing the code of the source functions.
+		 * <ul>
+		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
+		 *         because processing time never works with element timestamps.</li>
+		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten
with the
+		 *         system's current time, to realize proper ingestion time semantics.</li>
+		 *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
+		 * </ul>
 		 *
 		 * @param element The element to emit
-		 * @param timestamp The timestamp in milliseconds
+		 * @param timestamp The timestamp in milliseconds since the Epoch
 		 */
 		@PublicEvolving
 		void collectWithTimestamp(T element, long timestamp);
 
 		/**
-		 * Emits the given {@link org.apache.flink.streaming.api.watermark.Watermark}.
-		 *
-		 * <p>
-		 * <b>Important:</b>
-		 * Sources must only manually emit watermarks when they implement
-		 * {@link EventTimeSourceFunction}.
-		 * Otherwise, elements automatically get the current timestamp assigned at ingress
-		 * and the system automatically emits watermarks.
+		 * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
+		 * elements with a timestamp {@code t' <= t} will occur any more. If further such
+		 * elements will be emitted, those elements are considered <i>late</i>.
+		 * 
+		 * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
+		 * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
+		 * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
+		 * automatic ingestion time watermarks.
 		 *
-		 * @param mark The {@link Watermark} to emit
+		 * @param mark The Watermark to emit
 		 */
 		@PublicEvolving
 		void emitWatermark(Watermark mark);
 
 
 		/**
-		 * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
-		 * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
+		 * Returns the checkpoint lock. Please refer to the class-level comment in
+		 * {@link SourceFunction} for details about how to write a consistent checkpointed
+		 * source.
 		 * 
-		 * @return The object to use as the lock. 
+		 * @return The object to use as the lock 
 		 */
 		Object getCheckpointLock();
 
 		/**
-		 * This must be called when closing the source operator to allow the {@link SourceContext}
-		 * to clean up internal state.
+		 * This method is called by the system to shut down the context.
 		 */
 		void close();
 	}


Mime
View raw message