Sorry I was not clear: 
I meant the initial DataSet is changing. Not the ds. :)

Am 22.03.2016 um 15:28 schrieb Till Rohrmann <>:

From the code extract I cannot tell what could be wrong because the code looks ok. If ds changes, then your normalization result should change as well, I would assume.

On Tue, Mar 22, 2016 at 3:15 PM, Lydia Ickler <> wrote:
Hi Till,

maybe it is doing so because I rewrite the ds in the next step again and then the working steps get mixed?
I am reading the data from a local .csv file with readMatrix(env, „filename")

See code below.

Best regards,

//read input file
DataSet<Tuple3<Integer, Integer, Double>> ds = readMatrix(env, input);


//get initial vector - which equals matrixA * [1, ... , 1]
DataSet<Tuple3<Integer, Integer, Double>> initial = ds(0).aggregate(Aggregations.SUM,2);

//normalize by maximum value
initial = initial.cross(initial.aggregate(Aggregations.MAX, 2)).map(new normalizeByMax());
public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
return csvReader.types(Integer.class, Integer.class, Double.class);

Am 22.03.2016 um 14:47 schrieb Till Rohrmann <>:

Hi Lydia,

I tried to reproduce your problem but I couldn't. Can it be that you have somewhere a non deterministic operation in your program or do you read the data from a source with varying data? Maybe you could send us a compilable and complete program which reproduces your problem.


On Tue, Mar 22, 2016 at 2:21 PM, Lydia Ickler <> wrote:
Hi all,

I have a question.
If I have a DataSet DataSet<Tuple3<Integer, Integer, Double>> ds and I want to normalize all values (at position 2) in it by the maximum of the DataSet (ds.aggregate(Aggregations.MAX, 2)). 
How do I tackle that?

If I use the cross operator my result changes every time I run the program (see code below)
Any suggestions?

Thanks in advance!
ds.cross(ds.aggregate(Aggregations.MAX, 2)).map(new normalizeByMax());
public static final class normalizeByMax implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {

public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);