flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1959] [runtime] Support accumulators in chained functions after a non-UDF operation
Date Tue, 12 May 2015 23:15:54 GMT
[FLINK-1959] [runtime] Support accumulators in chained functions after a non-UDF operation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73493335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73493335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73493335

Branch: refs/heads/master
Commit: 73493335f4dbecbb4f1f9f954b08534a5e35ca90
Parents: cf4f22e
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue May 12 23:00:29 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue May 12 23:00:29 2015 +0200

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  | 14 +++---
 .../runtime/operators/RegularPactTask.java      | 46 ++++++++++++--------
 2 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9b0e019..3e2e359 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -40,7 +40,8 @@ public class AccumulatorHelper {
 			if (ownAccumulator == null) {
 				// Take over counter from chained task
 				target.put(otherEntry.getKey(), otherEntry.getValue());
-			} else {
+			}
+			else {
 				// Both should have the same type
 				AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
 						ownAccumulator.getClass(), otherEntry.getValue().getClass());
@@ -122,12 +123,13 @@ public class AccumulatorHelper {
 		return builder.toString();
 	}
 
-	public static void resetAndClearAccumulators(
-			Map<String, Accumulator<?, ?>> accumulators) {
-		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet())
{
-			entry.getValue().resetLocal();
+	public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>>
accumulators) {
+		if (accumulators != null) {
+			for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet())
{
+				entry.getValue().resetLocal();
+			}
+			accumulators.clear();
 		}
-		accumulators.clear();
 	}
 
 	public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?,

http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c844d8e..1c3328e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
@@ -70,6 +71,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -508,14 +510,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 			// JobManager. close() has been called earlier for all involved UDFs
 			// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
 			// modify accumulators;
-			if (this.stub != null) {
-				// collect the counters from the stub
-				if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null)
{
-					Map<String, Accumulator<?, ?>> accumulators =
-							FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
-					RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
-				}
-			}
+
+			// collect the counters from the udf in the core driver
+			Map<String, Accumulator<?, ?>> accumulators =
+					FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
+			
+			// collect accumulators from chained tasks and report them
+			reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
 		}
 		catch (Exception ex) {
 			// close the input, but do not report any exceptions, since we already have another root
cause
@@ -572,16 +573,25 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 		// We can merge here the accumulators from the stub and the chained
 		// tasks. Type conflicts can occur here if counters with same name but
 		// different type were used.
-		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-				Map<String, Accumulator<?, ?>> chainedAccumulators =
-						FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
-				AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+		
+		if (!chainedTasks.isEmpty()) {
+			if (accumulators == null) {
+				accumulators = new HashMap<String, Accumulator<?, ?>>();
+			}
+			
+			for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
+				RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+				if (rc != null) {
+					Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators();
+					if (chainedAccumulators != null) {
+						AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+					}
+				}
 			}
 		}
 
 		// Don't report if the UDF didn't collect any accumulators
-		if (accumulators.size() == 0) {
+		if (accumulators == null || accumulators.size() == 0) {
 			return;
 		}
 
@@ -592,9 +602,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 		// (e.g. in iterations) and we don't want to count twice. This may not be
 		// done before sending
 		AccumulatorHelper.resetAndClearAccumulators(accumulators);
+		
 		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-				AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(),
null).getAllAccumulators());
+			RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+			if (rc != null) {
+				AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
 			}
 		}
 	}
@@ -1140,7 +1152,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 			} catch (InterruptedException iex) {
 				throw new RuntimeException("Interrupted while waiting for input " + index + " to become
available.");
 			} catch (IOException ioex) {
-				throw new RuntimeException("An I/O Exception occurred whily obaining input " + index
+ ".");
+				throw new RuntimeException("An I/O Exception occurred while obtaining input " + index
+ ".");
 			}
 		}
 	}


Mime
View raw message