flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject flink git commit: [streaming] StreamExecutionEnvironment methods for streaming fault tolerance
Date Fri, 20 Mar 2015 17:32:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master 55db268f0 -> 79000c85b


[streaming] StreamExecutionEnvironment methods for streaming fault tolerance


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79000c85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79000c85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79000c85

Branch: refs/heads/master
Commit: 79000c85be7dd5b40c988bdc490ad6ee22c6c6b7
Parents: 55db268
Author: mbalassi <mbalassi@apache.org>
Authored: Fri Mar 20 14:54:49 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Mar 20 17:00:57 2015 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 51 +++++++++++++++++---
 .../api/scala/StreamExecutionEnvironment.scala  | 45 +++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79000c85/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3a617dc..2096745 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -152,21 +152,60 @@ public abstract class StreamExecutionEnvironment {
 		this.bufferTimeout = timeoutMillis;
 		return this;
 	}
-	
-	public StreamExecutionEnvironment enableMonitoring(long interval)
-	{
+
+	/**
+	 * Method for enabling fault-tolerance. Activates monitoring and backup of streaming operator
states.
+	 *
+	 * <p>
+	 * Setting this option assumes that the job is used in production and thus if not stated
explicitly
+	 * otherwise with calling with the {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
+	 * method in case of failure the job will be resubmitted to the cluster indefinitely.
+	 *
+	 * @param interval Time interval between state checkpoints in millis
+	 */
+	public StreamExecutionEnvironment enableMonitoring(long interval) {
 		streamGraph.setMonitoringEnabled(true);
 		streamGraph.setMonitoringInterval(interval);
 		return this;
 	}
-	
-	public StreamExecutionEnvironment enableMonitoring()
-	{
+
+
+	/**
+	 * Method for enabling fault-tolerance. Activates monitoring and backup of streaming operator
states.
+	 *
+	 * <p>
+	 * Setting this option assumes that the job is used in production and thus if not stated
explicitly
+	 * otherwise with calling with the {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
+	 * method in case of failure the job will be resubmitted to the cluster indefinitely.
+	 */
+	public StreamExecutionEnvironment enableMonitoring() {
 		streamGraph.setMonitoringEnabled(true);
 		return this;
 	}
 
 	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 *
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute
failed tasks.
+	 */
+	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
+	}
+
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 *
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return config.getNumberOfExecutionRetries();
+	}
+
+	/**
 	 * Sets the maximum time frequency (milliseconds) for the flushing of the
 	 * output buffers. For clarification on the extremal values see
 	 * {@link #setBufferTimeout(long)}.

http://git-wip-us.apache.org/repos/asf/flink/blob/79000c85/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 1212b2b..4672fca 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -76,6 +76,51 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getBufferTimout: Long = javaEnv.getBufferTimeout()
 
   /**
+   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
+   * operator states. Time interval between state checkpoints is specified in in millis.
+   *
+   * Setting this option assumes that the job is used in production and thus if not stated
+   * explicitly otherwise with calling with the
+   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * failure the job will be resubmitted to the cluster indefinitely.
+   */
+  def enableMonitoring(interval : Long) : StreamExecutionEnvironment = {
+    javaEnv.enableMonitoring(interval)
+    this
+  }
+
+  /**
+   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
+   * operator states. Time interval between state checkpoints is specified in in millis.
+   *
+   * Setting this option assumes that the job is used in production and thus if not stated
+   * explicitly otherwise with calling with the
+   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * failure the job will be resubmitted to the cluster indefinitely.
+   */
+  def enableMonitoring() : StreamExecutionEnvironment = {
+    javaEnv.enableMonitoring()
+    this
+  }
+
+  /**
+   * Sets the number of times that failed tasks are re-executed. A value of zero
+   * effectively disables fault tolerance. A value of "-1" indicates that the system
+   * default value (as defined in the configuration) should be used.
+   */
+  def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+    javaEnv.setNumberOfExecutionRetries(numRetries)
+  }
+
+  /**
+   * Gets the number of times the system will try to re-execute failed tasks. A value
+   * of "-1" indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
+
+
+  /**
    * Registers the given type with the serializer at the [[KryoSerializer]].
    *
    * Note that the serializer instance must be serializable (as defined by java.io.Serializable),


Mime
View raw message