flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] incubator-flink git commit: [scala] Change ScalaAggregateOperator to use TypeSerializer
Date Sun, 14 Dec 2014 22:26:31 GMT
[scala] Change ScalaAggregateOperator to use TypeSerializer

This closes #263


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9e403667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9e403667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9e403667

Branch: refs/heads/master
Commit: 9e40366752ba958c562fca69f2afdf6a8ca54b2e
Parents: 0028238
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Dec 11 13:07:06 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Dec 14 16:09:55 2014 +0100

----------------------------------------------------------------------
 .../scala/operators/ScalaAggregateOperator.java | 29 ++++----------------
 1 file changed, 6 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9e403667/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 293b380..d352817 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.aggregation.AggregationFunction;
 import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
 import org.apache.flink.api.java.aggregation.Aggregations;
@@ -39,8 +38,6 @@ import org.apache.flink.api.java.operators.Grouping;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.SingleInputOperator;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
@@ -164,18 +161,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN,
IN, Scal
 		}
 		genName.setLength(genName.length()-1);
 
-		TypeSerializer<IN> serializer = getInputType().createSerializer();
-		TypeSerializerFactory<IN> serializerFactory;
-		if (serializer.isStateful()) {
-			serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
-					serializer, getInputType().getTypeClass());
-		} else {
-			serializerFactory = new RuntimeStatelessSerializerFactory<IN>(
-					serializer, getInputType().getTypeClass());
-		}
-
 		@SuppressWarnings("rawtypes")
-		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(serializerFactory,
aggFunctions, fields);
+		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType().createSerializer(),
aggFunctions, fields);
 
 
 		String name = getName() != null ? getName() : genName.toString();
@@ -251,17 +238,14 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN,
IN, Scal
 
 		private final AggregationFunction<Object>[] aggFunctions;
 
-		private final TypeSerializerFactory<T> serializerFactory;
+		private TupleSerializerBase<T> serializer;
 
-		private transient TupleSerializerBase<T> serializer;
-
-		public AggregatingUdf(TypeSerializerFactory<T> serializerFactory, AggregationFunction<Object>[]
aggFunctions, int[] fieldPositions) {
-			Validate.notNull(serializerFactory);
+		public AggregatingUdf(TypeSerializer<T> serializer, AggregationFunction<Object>[]
aggFunctions, int[] fieldPositions) {
+			Validate.notNull(serializer);
 			Validate.notNull(aggFunctions);
 			Validate.isTrue(aggFunctions.length == fieldPositions.length);
-			Validate.isTrue(serializerFactory.getSerializer() instanceof TupleSerializerBase);
-
-			this.serializerFactory = serializerFactory;
+			Validate.isInstanceOf(TupleSerializerBase.class, serializer, "Serializer for Scala Aggregate
Operator must be a tuple serializer.");
+			this.serializer = (TupleSerializerBase<T>) serializer;
 			this.aggFunctions = aggFunctions;
 			this.fieldPositions = fieldPositions;
 		}
@@ -272,7 +256,6 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN,
IN, Scal
 			for (AggregationFunction<Object> aggFunction : aggFunctions) {
 				aggFunction.initializeAggregate();
 			}
-			serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
 		}
 
 		@Override


Mime
View raw message