flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Janani Chakkaradhari <janani.cs...@gmail.com>
Subject Convergence Criterion in Bulk Iteration - Java API
Date Fri, 04 Jul 2014 09:55:56 GMT
Hi,

I am trying to use ConvergenceCriterion for checking L1 norm of line rank
vector(similar to page rank vector) in the iteration as a termination
condition. Below I pasted the code for the iteration part and the
convergence criterion checking. It is working fine in local for smaller
dataset (in KBs) and the iteration converges in reasonable number of times.
But when I run it for little large dataset (4 MB) : for just 8 iterations,
it took 1 hour and keeps running.

So I tried to run the same code by replacing the L1 Norm convergence
criterion with FilterFunction (as similar to EpsilonFilter in PageRankBasic
example) as termination condition and now the job finishes very fast. It
seems to be that the convergence criterion which I am using for termination
is making the job very slow. Can someone give suggestions on it or is there
any other way to do it?

Thanks,
Janani

        //Initialize random vector with mx1
        DataSet<Tuple2<Long, Double>> edgeScores = d.map(new
InitializeRandomVector()).name("V");
        IterativeDataSet<Tuple2<Long,Double>> iteration =
edgeScores.iterate(maxIterations)

.registerAggregationConvergenceCriterion(L1_NormDiff.AGGREGATOR_NAME,
DoubleSumAggregator.class, L1_NormConvergence.class)
                .name("EdgeScoreVector_BulkIteration");

        DataSet<Tuple2<Long, Double>> new_edgeScores = iteration
                    .join(d).where(0).equalTo(0).with(new
V1_HadamardProduct()).name("V1")
                    .join(srcIncMat).where(0).equalTo(0).with(new
V2_SrcIncWithV1()).name("V2")
                    .groupBy(0).aggregate(Aggregations.SUM, 1)
                    .join(tarIncMat).where(0).equalTo(1).with(new
V3_TarIncWithV2()).name("V3")
                    .map(new DampingMapper(c, numEdges))
                    .join(iteration).where(0).equalTo(0).with(new
L1_NormDiff()).name("L1_NORM");

        DataSet<Tuple2<Long, Double>> convergedVector =
iteration.closeWith(new_edgeScores);


    public static final class L1_NormConvergence implements
ConvergenceCriterion<DoubleValue>{
        public boolean isConverged(int iteration, DoubleValue value) {
            double diff = value.getValue();
            return diff < EPSILON;
        }
    }

        public static final class L1_NormDiff extends
JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long,
Double>> {

        public static final String AGGREGATOR_NAME = "linerank.aggregator";
        private DoubleSumAggregator  agg;

        public void open(Configuration parameters) {
                      agg =
getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
        }
        @Override
        public Tuple2<Long, Double> join(Tuple2<Long, Double> current,
                Tuple2<Long, Double> prev) throws Exception {
            agg.aggregate(Math.abs(prev.f1 - current.f1));
            return current;
        }
    }

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message