flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lydia Ickler <ickle...@googlemail.com>
Subject for loop slow
Date Sat, 26 Mar 2016 22:16:36 GMT
Hi,

I have an issue with a for-loop.
If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out
why.
With 1, 2 or 3 it runs smoothly.
I attached the code below and marked the loop with //PROBLEM.

Thanks in advance!
Lydia

package org.apache.flink.contrib.lifescience.examples;

import edu.princeton.cs.algs4.Graph;
import edu.princeton.cs.algs4.SymbolDigraph;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

import java.util.*;

import static edu.princeton.cs.algs4.GraphGenerator.simple;

public class PowerIteration {

    //path to input
    static String input = null;
    //path to output
    static String output = null;
    //number of iterations (default = 7)
    static int iterations = 7;
    //threshold
    static double delta = 0.01;

    public void run() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //read input file
        DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

        DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
        DataSet<Tuple3<Integer, Integer, Double>> eigenValue;

        //initial:
        //Approximate EigenVector by PowerIteration
        eigenVector = PowerIteration_getEigenVector(matrixA);
        //Approximate EigenValue by PowerIteration
        eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
        //Deflate original matrix
        matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);

        MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);

        MyResult next = null;

        //PROBLEM!!! get i eigenvalue gaps
        for(int i=0;i<2;i++){
            next = PowerIteration_routine(initial);
            initial = next;
            next.gap.print();
        }

        env.execute("Power Iteration");
    }

    public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment
env,
                                                                          String filePath)
{
        CsvReader csvReader = env.readCsvFile(filePath);
        csvReader.fieldDelimiter(",");
        csvReader.includeFields("ttt");
        return csvReader.types(Integer.class, Integer.class, Double.class);
    }

    public static final class ProjectJoinResultMapper implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {
        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {
            Integer row = value.f0.f0;
            Integer column = value.f1.f1;
            Double product = value.f0.f2 * value.f1.f2;
            return new Tuple3<Integer, Integer, Double>(row, column, product);
        }
    }

    public static final class RQ implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer,
Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {

            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
        }
    }

    public static void main(String[] args) throws Exception {
        if(args.length<2 || args.length > 4){
            System.err.println("Usage: PowerIteration <input path> <result path>
optional: <iterations> <threshold diff>");
            System.exit(0);
        }

        input = args[0];
        output = args[1];

        if(args.length==3) {
            iterations = Integer.parseInt(args[2]);
        }
        if(args.length==4){
            delta = Double.parseDouble(args[3]);
        }

        new PowerIteration2().run();
    }

    public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer,
Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>>
{

        public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer,
Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {

            if(!(candidate.f2 == old.f2)){
                out.collect(candidate);
            }

            //if(Math.abs(candidate.f2-old.f2) > delta){
            //    out.collect(candidate);
            //}

        }
    }

    public static final class normalizeByMax implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer,
Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
        }
    }

    public static final class firstX implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer,
Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
        }
    }

    public static final class resetIndex implements
            MapFunction<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple3<Integer, Integer, Double>value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
        }
    }


    public static final class decBy1 implements
            MapFunction<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple3<Integer, Integer, Double>value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
        }
    }

    public static final class resetIndex2 implements
            MapFunction<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple3<Integer, Integer, Double>value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
        }
    }

    public static final class MatrixTimesValue implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer,
Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {

            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
        }
    }

    public static final class MatrixMinusMatrix implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {
        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {
            Integer row = value.f0.f0;
            Integer column = value.f0.f1;
            Double result = value.f0.f2 - value.f1.f2;
            return new Tuple3<Integer, Integer, Double>(row, column, result);
        }
    }

    public static final class getGapCenter implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {
        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer,
Double>> value)
                throws Exception {
            Integer row = value.f0.f0;
            Integer column = value.f0.f1;
            Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
            return new Tuple3<Integer, Integer, Double>(row, column, result);
        }
    }


    public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer,
Integer, Double>> matrixA) throws Exception {

        //get initial vector - which equals matrixA * [1, ... , 1]
        DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);

        //normalize by maximum value
        DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new
normalizeByMax());

        //BulkIteration to find dominant eigenvector
        IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);

        DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM,
2)).groupBy(0).aggregate(Aggregations.SUM, 2).
                cross((matrixA.join(iteration).where(1).equalTo(0)
                        .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM,
2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
                .map(new normalizeByMax());

        DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new
deltaFilter());
        DataSet<Tuple3<Integer, Integer, Double>> eigenVector  = iteration.closeWith(intermediate,diffs);

        return eigenVector;
    }

    public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer,
Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector)
{

        //determine now EigenValue by approximating the Rayleigh Quotient:
        //get Ax
        DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM,
2).groupBy(0).aggregate(Aggregations.SUM, 2);
        //get Ax * x
        DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM,
2).aggregate(Aggregations.SUM,2);

        //now x * x
        DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM,
2).aggregate(Aggregations.SUM,2);

        return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
    }

    public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer,
Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector,
DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {

        DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new
resetIndex());
        DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new
FilterFunction<Tuple3<Integer, Integer, Double>>() {
            public boolean filter(Tuple3<Integer, Integer, Double> value) {
                return value.f0 == 0;
            }
        });
        firstVal = eigenValueReset.cross(firstVal).map(new firstX());
        DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new
FilterFunction<Tuple3<Integer, Integer, Double>>() {
            public boolean filter(Tuple3<Integer, Integer, Double> value) {
                return value.f0 == 0;
            }
        });
        DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new
MatrixTimesValue())).map(new DataSetUtils.transpose());
        DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new
MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
                map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM,
2);
        matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
        matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>()
{
            public boolean filter(Tuple3<Integer, Integer, Double> value) {
                return (value.f0 != 0) && (value.f1 != 0);
            }
        });

        return matrixA.map(new decBy1());
    }

    public MyResult PowerIteration_routine(MyResult initial) throws Exception {

        //Approximate EigenVector by PowerIteration
        DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
        //Approximate EigenValue by PowerIteration
        DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA,
eigenVector);
        //get gap
        DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new
MatrixMinusMatrix());
        //Deflate original matrix
        DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);

        return new MyResult(eigenVector,eigenValue,matrixA,gap);
    }

    public class MyResult {
        DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
        DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
        DataSet<Tuple3<Integer, Integer, Double>> gap;
        DataSet<Tuple3<Integer, Integer, Double>> matrixA;

        public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector,
DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer,
Integer, Double>> matrixA){
            this.eigenVector = eigenVector;
            this.eigenValue =eigenValue;
            this.matrixA = matrixA;
        }

        public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector,
DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer,
Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
            this.eigenVector = eigenVector;
            this.eigenValue =eigenValue;
            this.matrixA = matrixA;
            this.gap = gap;
        }
    }

}


Mime
View raw message