flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [FLINK-4199] fix misleading CLI messages during job submission
Date Tue, 19 Jul 2016 15:58:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master e85f787b2 -> 082d87e51


[FLINK-4199] fix misleading CLI messages during job submission

- change CLI message upon cluster retrieval
- save JobExecutionResult for interactive executions
- only print Collection size in accumulator results
- remove unused helper method

This closes #2264


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17589d45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17589d45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17589d45

Branch: refs/heads/master
Commit: 17589d454d00efa43cdf6116ea29ff4f513b6f20
Parents: e85f787
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Jul 19 09:51:23 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Jul 19 18:00:04 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  2 +-
 .../flink/client/program/ClusterClient.java     | 12 +++++-----
 .../common/accumulators/AccumulatorHelper.java  | 24 +++++++++++---------
 3 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a4691c9..a888841 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -884,7 +884,7 @@ public class CliFrontend {
 		ClusterClient client;
 		try {
 			client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
-			logAndSysout("Cluster retrieved: " + client.getClusterIdentifier());
+			logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
 		} catch (UnsupportedOperationException e) {
 			try {
 				String applicationName = "Flink Application: " + programName;

http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 6cb5abb..2e6a9cc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -97,11 +97,11 @@ public abstract class ClusterClient {
 	private boolean printStatusDuringExecution = true;
 
 	/**
-	 * For interactive invocations, the Job ID is only available after the ContextEnvironment
has
+	 * For interactive invocations, the job results are only available after the ContextEnvironment
has
 	 * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
-	 * which lets us access the last JobID here.
+	 * which lets us access the execution result here.
 	 */
-	private JobID lastJobID;
+	private JobExecutionResult lastJobExecutionResult;
 
 	/** Switch for blocking/detached job submission of the client */
 	private boolean detachedJobSubmission = false;
@@ -335,7 +335,7 @@ public abstract class ClusterClient {
 				}
 				else {
 					// in blocking mode, we execute all Flink jobs contained in the user code and then return
here
-					return new JobSubmissionResult(lastJobID);
+					return this.lastJobExecutionResult;
 				}
 			}
 			finally {
@@ -406,9 +406,9 @@ public abstract class ClusterClient {
 
 		try {
 			logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job
completion.");
-			this.lastJobID = jobGraph.getJobID();
-			return JobClient.submitJobAndWait(actorSystemLoader.get(),
+			this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(),
 				leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
+			return this.lastJobExecutionResult;
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index bcae504..1a87235 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -114,19 +115,20 @@ public class AccumulatorHelper {
 	public static String getResultsFormated(Map<String, Object> map) {
 		StringBuilder builder = new StringBuilder();
 		for (Map.Entry<String, Object> entry : map.entrySet()) {
-			builder.append("- ").append(entry.getKey()).append(" (").append(entry.getValue().getClass().getName());
-			builder.append(")").append(": ").append(entry.getValue().toString()).append("\n");
-		}
-		return builder.toString();
-	}
-
-	public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>>
accumulators) {
-		if (accumulators != null) {
-			for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet())
{
-				entry.getValue().resetLocal();
+			builder
+				.append("- ")
+				.append(entry.getKey())
+				.append(" (")
+				.append(entry.getValue().getClass().getName())
+				.append(")");
+			if (entry.getValue() instanceof Collection) {
+				builder.append(" [").append(((Collection) entry.getValue()).size()).append(" elements]");
+			} else {
+				builder.append(": ").append(entry.getValue().toString());
 			}
-			accumulators.clear();
+			builder.append(System.lineSeparator());
 		}
+		return builder.toString();
 	}
 
 	public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?,
?>> accumulators) {


Mime
View raw message