flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [streaming] Improve comments for source functions. Minor cleanups.
Date Sun, 22 Mar 2015 00:43:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master ff60ecfb2 -> 244e5d5f8


[streaming] Improve comments for source functions. Minor cleanups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f372358b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f372358b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f372358b

Branch: refs/heads/master
Commit: f372358bee992c5f25c315f99226bf5a65b6bbd9
Parents: 925481f
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Mar 20 17:12:11 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Mar 21 22:04:30 2015 +0100

----------------------------------------------------------------------
 .../function/source/ParallelSourceFunction.java | 12 ++++++++--
 .../source/RichParallelSourceFunction.java      | 18 +++++++++-----
 .../api/function/source/RichSourceFunction.java | 25 +++++++++++++++++---
 .../api/function/source/SourceFunction.java     | 23 +++++++++++-------
 .../streamvertex/StreamingRuntimeContext.java   |  3 +--
 5 files changed, 59 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f372358b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
index e37e851..3b96bf9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -18,9 +18,17 @@
 package org.apache.flink.streaming.api.function.source;
 
 /**
- * {@link SourceFunction} that may be executed in parallel.
+ * A stream data source that is executed in parallel. Upon execution, the runtime will
+ * execute as many parallel instances of this function function as configured parallelism
+ * of the source.
  *
- * @param <OUT>
+ * <p>This interface acts only as a marker to tell the system that this source may
+ * be executed in parallel. When different parallel instances are required to perform
+ * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
+ * context, which revels information like the number of parallel tasks, and which parallel
+ * task the current instance is.
+ *
+ * @param <OUT> The type of the records produced by this source.
  */
 public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f372358b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
index 5bbfd4c..028c06a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,16 +12,22 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.api.function.source;
 
-import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 
-public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
implements
-		ParallelSourceFunction<OUT> {
+/**
+ * Base class for implementing a data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
+		implements ParallelSourceFunction<OUT> {
 
 	private static final long serialVersionUID = 1L;
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f372358b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
index 4b947c7..b9331cb 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
@@ -19,9 +19,28 @@ package org.apache.flink.streaming.api.function.source;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 
-public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
-		SourceFunction<OUT> {
+/**
+ * Base class for implementing a parallel data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * <p>This class is useful when implementing parallel sources where different parallel
subtasks
+ * need to perform different work. Typical patterns for that are:
+ * <ul>
+ *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
+ *         to determine the current parallelism. It is strongly encouraged to use this method,
rather than
+ *         hard-wiring the parallelism, because the configured parallelism may change depending
on
+ *         program configuration. The parallelism may also change after recovering failures,
when fewer than
+ *         desired parallel worker as available.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()}
to
+ *         determine which subtask the current instance of the function executes.</li>
+ * </ul>
+ * </p>
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
SourceFunction<OUT> {
 
 	private static final long serialVersionUID = 1L;
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f372358b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index af63d80..ec8c226 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -23,29 +23,34 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 
 /**
- * Interface for implementing user defined source functionality.
+ * Interface for a stream data source.
  *
  * <p>Sources implementing this specific interface are executed with
  * degree of parallelism 1. To execute your sources in parallel
  * see {@link ParallelSourceFunction}.</p>
  *
- * @param <OUT> Output type parameter.
+ * @param <OUT> The type of the records produced by this source.
  */
 public interface SourceFunction<OUT> extends Function, Serializable {
 
 	/**
-	 * Function for standard source behaviour. This function is called only once
-	 * thus to produce multiple outputs make sure to produce multiple records.
+	 * Main work method of the source. This function is invoked at the beginning of the
+	 * source's life and is expected to produce its data py "pushing" the records into
+	 * the given collector.
 	 *
-	 * @param collector Collector for passing output records
-	 * @throws Exception
+	 * @param collector The collector that forwards records to the source's consumers.
+	 *
+	 * @throws Exception Throwing any type of exception will cause the source to be considered
+	 *                   failed. When fault tolerance is enabled, recovery will be triggered,
+	 *                   which may create a new instance of this source.
 	 */
 	public void run(Collector<OUT> collector) throws Exception;
 
 	/**
-	 * In case another vertex in topology fails this method is called before terminating
-	 * the source. Make sure to free up any allocated resources here.
+	 * This method signals the source function to cancel its operation
+	 * The method is called by the framework if the task is to be aborted prematurely.
+	 * This happens when the user cancels the job, or when the task is canceled as
+	 * part of a program failure and cleanup.
 	 */
 	public void cancel();
-		
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f372358b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index da083fb..ff876b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.state.OperatorState;
  */
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
-	public Environment env;
+	private final Environment env;
 	private final Map<String, OperatorState<?>> operatorStates;
 
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
@@ -85,7 +85,6 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 *            The name of the operator state.
 	 * @param state
 	 *            The state to be registered for this name.
-	 * @return The data stream with state registered.
 	 */
 	public void registerState(String name, OperatorState<?> state) {
 		if (state == null) {


Mime
View raw message