flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [hotfix] Code cleanups in the StreamConfig
Date Wed, 29 Jul 2015 16:43:57 GMT
[hotfix] Code cleanups in the StreamConfig


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/833862a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/833862a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/833862a9

Branch: refs/heads/master
Commit: 833862a999326a8c1b236af0418c7bd3423c7097
Parents: 7bd57d7
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jul 29 14:49:23 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jul 29 18:43:14 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamConfig.java | 55 +++++++++++++-------
 .../flink/streaming/api/graph/StreamGraph.java  |  8 ++-
 .../api/graph/StreamingJobGraphGenerator.java   |  2 +-
 3 files changed, 44 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index d0e8064..1562f38 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
@@ -37,6 +38,10 @@ public class StreamConfig implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
+	// ------------------------------------------------------------------------
+	//  Config Keys
+	// ------------------------------------------------------------------------
+	
 	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
 	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
 	private static final String CHAINED_OUTPUTS = "chainedOutputs";
@@ -59,16 +64,22 @@ public class StreamConfig implements Serializable {
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
+
+	private static final String CHECKPOINTING_ENABLED = "checkpointing";
 	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
 	private static final String STATE_PARTITIONER = "statePartitioner";
 
-	// DEFAULT VALUES
+	// ------------------------------------------------------------------------
+	//  Default Values
+	// ------------------------------------------------------------------------
+	
 	private static final long DEFAULT_TIMEOUT = 100;
-	public static final String STATE_MONITORING = "STATE_MONITORING";
 
-	// CONFIG METHODS
+	// ------------------------------------------------------------------------
+	//  Config
+	// ------------------------------------------------------------------------
 
-	private Configuration config;
+	private final Configuration config;
 
 	public StreamConfig(Configuration config) {
 		this.config = config;
@@ -78,6 +89,11 @@ public class StreamConfig implements Serializable {
 		return config;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Configured Properties
+	// ------------------------------------------------------------------------
+	
+
 	public void setVertexID(Integer vertexID) {
 		config.setInteger(VERTEX_NAME, vertexID);
 	}
@@ -335,12 +351,12 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setStateMonitoring(boolean stateMonitoring) {
-		config.setBoolean(STATE_MONITORING, stateMonitoring);
+	public void setCheckpointingEnabled(boolean enabled) {
+		config.setBoolean(CHECKPOINTING_ENABLED, enabled);
 	}
 
-	public boolean getStateMonitoring() {
-		return config.getBoolean(STATE_MONITORING, false);
+	public boolean isCheckpointingEnabled() {
+		return config.getBoolean(CHECKPOINTING_ENABLED, false);
 	}
 
 	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
@@ -435,28 +451,29 @@ public class StreamConfig implements Serializable {
 		builder.append("\n=======================");
 		builder.append("Stream Config");
 		builder.append("=======================");
-		builder.append("\nTask name: " + getVertexID());
-		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
-		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
-		builder.append("\nOutput names: " + getNonChainedOutputs(cl));
+		builder.append("\nTask name: ").append(getVertexID());
+		builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
+		builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
+		builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
 		builder.append("\nPartitioning:");
 		for (StreamEdge output : getNonChainedOutputs(cl)) {
 			int outputname = output.getTargetId();
-			builder.append("\n\t" + outputname + ": " + output.getPartitioner());
+			builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
 		}
 
-		builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+		builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
 
 		try {
-			builder.append("\nOperator: " + getStreamOperator(cl).getClass().getSimpleName());
-		} catch (Exception e) {
+			builder.append("\nOperator: ").append(getStreamOperator(cl).getClass().getSimpleName());
+		}
+		catch (Exception e) {
 			builder.append("\nOperator: Missing");
 		}
-		builder.append("\nBuffer timeout: " + getBufferTimeout());
-		builder.append("\nState Monitoring: " + getStateMonitoring());
+		builder.append("\nBuffer timeout: ").append(getBufferTimeout());
+		builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
 		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
 			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
-			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+			builder.append(getTransitiveChainedTaskConfigs(cl));
 		}
 
 		return builder.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index f1428b4..4de5224 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -42,6 +42,7 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -66,14 +67,19 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamGraph extends StreamingPlan {
 
+	/** The default interval for checkpoints, in milliseconds */
+	public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
+	
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
+
 	private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
 
 	private final StreamExecutionEnvironment environemnt;
 	private final ExecutionConfig executionConfig;
 
+	private CheckpointingMode checkpointingMode;
 	private boolean checkpointingEnabled = false;
-	private long checkpointingInterval = 5000;
+	private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
 	private boolean chaining = true;
 
 	private Map<Integer, StreamNode> streamNodes;

http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c988150..5551bf3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -269,7 +269,7 @@ public class StreamingJobGraphGenerator {
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
-		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
 		config.setStateHandleProvider(streamGraph.getStateHandleProvider());
 		config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
 


Mime
View raw message