flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <till.rohrm...@gmail.com>
Subject Re: normalizing DataSet with cross()
Date Tue, 22 Mar 2016 14:28:47 GMT
>From the code extract I cannot tell what could be wrong because the code
looks ok. If ds changes, then your normalization result should change as
well, I would assume.
​

On Tue, Mar 22, 2016 at 3:15 PM, Lydia Ickler <icklerly@googlemail.com>
wrote:

> Hi Till,
>
> maybe it is doing so because I rewrite the ds in the next step again and
> then the working steps get mixed?
> I am reading the data from a local .csv file with readMatrix(env,
> „filename")
>
> See code below.
>
> Best regards,
> Lydia
>
> //read input file
> DataSet<Tuple3<Integer, Integer, Double>> ds = readMatrix(env, input);
>
> /****************
>  POWER ITERATION
>  *****************/
>
> //get initial vector - which equals matrixA * [1, ... , 1]
> DataSet<Tuple3<Integer, Integer, Double>> initial = ds(0).aggregate(Aggregations.SUM,2);
>
> //normalize by maximum value
> initial = initial.cross(initial.aggregate(Aggregations.MAX, 2)).map(new normalizeByMax());
>
> public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment
env,
>                                                                       String filePath)
{
>     CsvReader csvReader = env.readCsvFile(filePath);
>     csvReader.fieldDelimiter(",");
>     csvReader.includeFields("ttt");
>     return csvReader.types(Integer.class, Integer.class, Double.class);
> }
>
>
> Am 22.03.2016 um 14:47 schrieb Till Rohrmann <trohrmann@apache.org>:
>
> Hi Lydia,
>
> I tried to reproduce your problem but I couldn't. Can it be that you have
> somewhere a non deterministic operation in your program or do you read the
> data from a source with varying data? Maybe you could send us a compilable
> and complete program which reproduces your problem.
>
> Cheers,
> Till
>
> On Tue, Mar 22, 2016 at 2:21 PM, Lydia Ickler <icklerly@googlemail.com>
> wrote:
>
>> Hi all,
>>
>> I have a question.
>> If I have a DataSet DataSet<Tuple3<Integer, Integer, Double>> ds and
I
>> want to normalize all values (at position 2) in it by the maximum of the
>> DataSet (ds.aggregate(Aggregations.MAX, 2)).
>> How do I tackle that?
>>
>> If I use the cross operator my result changes every time I run the
>> program (see code below)
>> Any suggestions?
>>
>> Thanks in advance!
>> Lydia
>>
>> ds.cross(ds.aggregate(Aggregations.MAX, 2)).map(new normalizeByMax());
>>
>> public static final class normalizeByMax implements
>>         MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer,
Integer, Double>>,
>>                 Tuple3<Integer, Integer, Double>> {
>>
>>     public Tuple3<Integer, Integer, Double> map(
>>             Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer,
Integer, Double>> value)
>>             throws Exception {
>>         return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
>>     }
>> }
>>
>>
>>
>>
>
>

Mime
View raw message