flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Implementing a list accumulator
Date Wed, 21 Jan 2015 17:32:20 GMT
The thing is that the DefaultCrossFunction always uses the same holder Tuple2 object, which
is then handed over to the chained collect helper flatMap(). I can see why it is OK to keep
the default functions to reuse "holder" objects, but when they are chained to an operator
it becomes problematic.

On 21 Jan 2015, at 17:12, Ufuk Celebi <uce@apache.org> wrote:

> I just checked out your branch and ran it with a break point set at the CollectHelper.
If you look into the (list) accumulator you see that always the same object is added to it.
Strangely enough, object re-use is disabled in the config. I don't have time to look further
into it now, but it seems to be a problem with the object re-use mode.
> 
> – Ufuk
> 
> On 20 Jan 2015, at 20:53, Max Michels <max@data-artisans.com> wrote:
> 
>> Hi everyone,
>> 
>> I'm running into some problems implementing a Accumulator for
>> returning a list of a DataSet.
>> 
>> https://github.com/mxm/flink/tree/count/collect
>> 
>> Basically, it works fine in this test case:
>> 
>> 
>>   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> 
>>   Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
>> 
>>   DataSet<Integer> data = env.fromElements(input);
>> 
>>   // count
>>   long numEntries = data.count();
>>   assertEquals(10, numEntries);
>> 
>>   // collect
>>   ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
>>   assertArrayEquals(input, list.toArray());
>> 
>> 
>> But with non-primitive objects strange results occur:
>> 
>> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableObjectReuse();
>> 
>> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
>> 
>> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
>> 
>> // count
>> long numEntries = data3.count();
>> assertEquals(100, numEntries);
>> 
>> // collect
>> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
>> Integer>>) data3.collect();
>> 
>> System.out.println(list)
>> 
>> ....
>> 
>> Output:
>> 
>> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
>> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
>> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
>> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
>> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
>> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
>> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
>> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
>> 
>> I assume, the problem is the clone() method of the ListAccumulator
>> where we just create a shallow copy. This is fine for accumulators
>> which use primitive objects, like IntCounter but here we have a real
>> object.
>> 
>> How do we work around this problem?
>> 
>> Best regards,
>> Max
> 


Mime
View raw message