flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-1530] Cache deserialized ExecutionConfig in the AbstractInvokable
Date Wed, 11 Feb 2015 19:43:24 GMT
[FLINK-1530] Cache deserialized ExecutionConfig in the AbstractInvokable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69b79456
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69b79456
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69b79456

Branch: refs/heads/master
Commit: 69b794561c151261d48f7fa5463c3b24894a8efa
Parents: 18c9082
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 11 18:19:35 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 20:42:43 2015 +0100

----------------------------------------------------------------------
 .../jobgraph/tasks/AbstractInvokable.java       | 38 +++++++++++++-------
 1 file changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69b79456/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index e7e9de6..c63139a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -26,18 +26,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstract base class for every task class in Flink.
+ * This is the abstract base class for every task that can be executed ba a TaskManager.
+ * Concrete tasks like the stream vertices of the batch tasks
+ * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class.
+ *
+ * The TaskManager invokes the methods {@link #registerInputOutput()} and {@link #invoke()}
in
+ * this order when executing a task. The first method is responsible for setting up input
and
+ * output stream readers and writers, the second method contains the task's core operation.
  */
 public abstract class AbstractInvokable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokable.class);
 
 
-	/**
-	 * The environment assigned to this invokable.
-	 */
+	/** The environment assigned to this invokable. */
 	private volatile Environment environment;
 
+	/** The execution config, cached from the deserialization from the JobConfiguration */
+	private ExecutionConfig executionConfig;
+
+
 	/**
 	 * Must be overwritten by the concrete task to instantiate the required record reader and
record writer.
 	 */
@@ -48,7 +56,7 @@ public abstract class AbstractInvokable {
 	 * when the actual execution of the task starts.
 	 * 
 	 * @throws Exception
-	 *         thrown if any exception occurs during the execution of the tasks
+	 *         Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
 	 */
 	public abstract void invoke() throws Exception;
 
@@ -118,20 +126,26 @@ public abstract class AbstractInvokable {
 	}
 
 	/**
-	 * Returns the global ExecutionConfig.
+	 * Returns the global ExecutionConfig, obtained from the job configuration.
 	 */
 	public ExecutionConfig getExecutionConfig() {
+		if (executionConfig != null) {
+			return executionConfig;
+		}
+
 		try {
-			ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
+			executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
 					getJobConfiguration(),
 					ExecutionConfig.CONFIG_KEY,
 					this.getClass().getClassLoader());
-			if (c != null) {
-				return c;
-			} else {
-				return new ExecutionConfig();
+
+			if (executionConfig == null) {
+				LOG.warn("Environment did not contain an ExecutionConfig - using a default config.");
+				executionConfig = new ExecutionConfig();
 			}
-		} catch (Exception e) {
+			return executionConfig;
+		}
+		catch (Exception e) {
 			LOG.warn("Could not load ExecutionConfig from Environment, returning default ExecutionConfig:
{}", e);
 			return new ExecutionConfig();
 		}


Mime
View raw message