flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [3/4] flink git commit: [FLINK-1756] [streaming] Rename Stream Monitoring to Stream Checkpointing
Date Sun, 22 Mar 2015 17:35:27 GMT
[FLINK-1756] [streaming] Rename Stream Monitoring to Stream Checkpointing

Also set the default checkpoint interval to 5 secs instead of 10.

This closes #506


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2842e2fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2842e2fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2842e2fd

Branch: refs/heads/master
Commit: 2842e2fd9893add426c42aa40436b898e40a1ffc
Parents: 1179649
Author: mbalassi <mbalassi@apache.org>
Authored: Fri Mar 20 16:06:56 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Mar 22 16:35:06 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 30 ++++++++++----------
 .../flink/runtime/jobmanager/JobManager.scala   |  8 +++---
 .../apache/flink/streaming/api/StreamGraph.java | 20 ++++++-------
 .../api/StreamingJobGraphGenerator.java         |  6 ++--
 .../environment/StreamExecutionEnvironment.java | 10 +++----
 .../api/scala/StreamExecutionEnvironment.scala  |  8 +++---
 6 files changed, 41 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 46ec445..91d01a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -178,11 +178,11 @@ public class ExecutionGraph implements Serializable {
 
 	private ActorContext parentContext;
 
-	private  ActorRef stateMonitorActor;
+	private  ActorRef stateCheckpointerActor;
 
-	private boolean monitoringEnabled;
+	private boolean checkpointingEnabled;
 
-	private long monitoringInterval = 10000;
+	private long checkpointingInterval = 5000;
 
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration
timeout) {
 		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
@@ -223,12 +223,12 @@ public class ExecutionGraph implements Serializable {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public void setStateMonitorActor(ActorRef stateMonitorActor) {
-		this.stateMonitorActor = stateMonitorActor;
+	public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) {
+		this.stateCheckpointerActor = stateCheckpointerActor;
 	}
 
-	public ActorRef getStateMonitorActor() {
-		return stateMonitorActor;
+	public ActorRef getStateCheckpointerActor() {
+		return stateCheckpointerActor;
 	}
 
 	public void setParentContext(ActorContext parentContext) {
@@ -289,12 +289,12 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	public void setMonitoringEnabled(boolean monitoringEnabled) {
-		this.monitoringEnabled = monitoringEnabled;
+	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+		this.checkpointingEnabled = checkpointingEnabled;
 	}
 
-	public void setMonitoringInterval(long  monitoringInterval) {
-		this.monitoringInterval = monitoringInterval;
+	public void setCheckpointingInterval(long checkpointingInterval) {
+		this.checkpointingInterval = checkpointingInterval;
 	}
 
 	/**
@@ -451,9 +451,9 @@ public class ExecutionGraph implements Serializable {
 					throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
 			}
 
-			if (monitoringEnabled) {
-				stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this,
-						Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
+			if (checkpointingEnabled) {
+				stateCheckpointerActor = StreamCheckpointCoordinator.spawn(parentContext, this,
+						Duration.create(checkpointingInterval, TimeUnit.MILLISECONDS));
 			}
 		}
 		else {
@@ -777,6 +777,6 @@ public class ExecutionGraph implements Serializable {
 		
 		scheduler = null;
 		parentContext = null;
-		stateMonitorActor = null;
+		stateCheckpointerActor = null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 70a1cbb..61a0aea 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -311,13 +311,13 @@ class JobManager(val configuration: Configuration,
     case msg: BarrierAck =>
       currentJobs.get(msg.jobID) match {
         case Some(jobExecution) =>
-          jobExecution._1.getStateMonitorActor forward  msg
+          jobExecution._1.getStateCheckpointerActor forward  msg
         case None =>
       }
     case msg: StateBarrierAck =>
       currentJobs.get(msg.jobID) match {
         case Some(jobExecution) =>
-          jobExecution._1.getStateMonitorActor forward  msg
+          jobExecution._1.getStateCheckpointerActor forward  msg
         case None =>
       }
       
@@ -487,8 +487,8 @@ class JobManager(val configuration: Configuration,
         executionGraph.setScheduleMode(jobGraph.getScheduleMode)
         executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
         
-        executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled)
-        executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval)
+        executionGraph.setCheckpointingEnabled(jobGraph.isMonitoringEnabled)
+        executionGraph.setCheckpointingInterval(jobGraph.getMonitorInterval)
 
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 71706bc..970ce49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -92,9 +92,9 @@ public class StreamGraph extends StreamingPlan {
 
 	private ExecutionConfig executionConfig;
 	
-	private boolean monitoringEnabled;
+	private boolean checkpointingEnabled;
 	
-	private long monitoringInterval = 10000;
+	private long checkpointingInterval = 5000;
 
 	public StreamGraph(ExecutionConfig executionConfig) {
 
@@ -559,20 +559,20 @@ public class StreamGraph extends StreamingPlan {
 		return executionConfig;
 	}
 
-	public void setMonitoringEnabled(boolean monitoringEnabled) {
-		this.monitoringEnabled = monitoringEnabled;
+	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+		this.checkpointingEnabled = checkpointingEnabled;
 	}
 
-	public boolean isMonitoringEnabled() {
-		return monitoringEnabled;
+	public boolean isCheckpointingEnabled() {
+		return checkpointingEnabled;
 	}
 
-	public void setMonitoringInterval(long monitoringInterval) {
-		this.monitoringInterval = monitoringInterval;
+	public void setCheckpointingInterval(long checkpointingInterval) {
+		this.checkpointingInterval = checkpointingInterval;
 	}
 
-	public long getMonitoringInterval() {
-		return monitoringInterval;
+	public long getCheckpointingInterval() {
+		return checkpointingInterval;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 544ccc6..ad744d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -78,8 +78,8 @@ public class StreamingJobGraphGenerator {
 		// Turn lazy scheduling off
 		jobGraph.setScheduleMode(ScheduleMode.ALL);
 		jobGraph.setJobType(JobGraph.JobType.STREAMING);
-		jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
-		jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
+		jobGraph.setMonitoringEnabled(streamGraph.isCheckpointingEnabled());
+		jobGraph.setMonitorInterval(streamGraph.getCheckpointingInterval());
 
 		if(jobGraph.isMonitoringEnabled()) {
 			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
@@ -257,7 +257,7 @@ public class StreamingJobGraphGenerator {
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
-		config.setStateMonitoring(streamGraph.isMonitoringEnabled());
+		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
 
 		Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2096745..994ff15 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,9 +163,9 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @param interval Time interval between state checkpoints in millis
 	 */
-	public StreamExecutionEnvironment enableMonitoring(long interval) {
-		streamGraph.setMonitoringEnabled(true);
-		streamGraph.setMonitoringInterval(interval);
+	public StreamExecutionEnvironment enableCheckpointing(long interval) {
+		streamGraph.setCheckpointingEnabled(true);
+		streamGraph.setCheckpointingInterval(interval);
 		return this;
 	}
 
@@ -178,8 +178,8 @@ public abstract class StreamExecutionEnvironment {
 	 * otherwise with calling with the {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
 	 * method in case of failure the job will be resubmitted to the cluster indefinitely.
 	 */
-	public StreamExecutionEnvironment enableMonitoring() {
-		streamGraph.setMonitoringEnabled(true);
+	public StreamExecutionEnvironment enableCheckpointing() {
+		streamGraph.setCheckpointingEnabled(true);
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 4672fca..2208388 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -84,8 +84,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
-  def enableMonitoring(interval : Long) : StreamExecutionEnvironment = {
-    javaEnv.enableMonitoring(interval)
+  def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing(interval)
     this
   }
 
@@ -98,8 +98,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
-  def enableMonitoring() : StreamExecutionEnvironment = {
-    javaEnv.enableMonitoring()
+  def enableCheckpointing() : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing()
     this
   }
 


Mime
View raw message