flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsaputra <...@git.apache.org>
Subject [GitHub] incubator-flink pull request: [scala] Change ScalaAggregateOperato...
Date Thu, 11 Dec 2014 16:57:49 GMT
Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/263#discussion_r21690247
  
    --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
---
    @@ -164,18 +160,8 @@ public ScalaAggregateOperator(Grouping<IN> input, Aggregations
function, int fie
     		}
     		genName.setLength(genName.length()-1);
     
    -		TypeSerializer<IN> serializer = getInputType().createSerializer();
    -		TypeSerializerFactory<IN> serializerFactory;
    -		if (serializer.isStateful()) {
    -			serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
    -					serializer, getInputType().getTypeClass());
    -		} else {
    -			serializerFactory = new RuntimeStatelessSerializerFactory<IN>(
    -					serializer, getInputType().getTypeClass());
    -		}
    -
     		@SuppressWarnings("rawtypes")
    -		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(serializerFactory,
aggFunctions, fields);
    +		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf((TupleSerializerBase)
getInputType().createSerializer(), aggFunctions, fields);
    --- End diff --
    
    Ah sorry, I meant getIntputType call returns TypeSerializer and the AggregatingUdf constructor
is taking TupleSerializerBase so could the constructor parameter takes TypeSerializer instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message