Hi Stephan,
Thanks for the reply. Below I pasted the code for the other variant which
runs faster compared to the one in the previous email. Also here, I tried
to answer your questions.
*1) Your code uses an extra join for the convergence check. Does
othervariant you talk about also introduce an additional join?*
Yes, but it is a filter join. In my case I don't want to filter the tuples
during the iteration, instead I want to check the convergence at the end of
every super step and stop the entire iteration when convergence check is
true.
*2) The join you introduce, does it increase the data volume (find multiple
matches partners per record) such that more records are emitted?*
No. In the code from the previous email, each tuple emitted from
DampingMapper is joined with exactly one tuple in the iteration. Hence the
final join (L1_NormDiff()) in the iteration exactly emits one tuple for
each joining key.
For example if we consider it for page rank algorithm, the newly computed
page rank for each page (each tuple) is compared with its value in the
previous iteration and find the its difference in L1_NormDiff join. I want
to aggregate this difference for all the pages in the current iteration and
check this aggregated value is less than certain threshold at the end of
each super step.
Bulk Iteration:
DataSet<Tuple2<Long, Double>> edgeScores = d.map(new
InitializeRandomVector()).name("V");
IterativeDataSet<Tuple2<Long,Double>> iteration =
edgeScores.iterate(maxIterations)
.name("EdgeScoreVector_BulkIteration");
DataSet<Tuple2<Long, Double>> new_edgeScores = iteration
.join(d).where(0).equalTo(0).with(new
V1_HadamardProduct()).name("V1")
.join(srcIncMat).where(0).equalTo(0).with(new
V2_SrcIncWithV1()).name("V2")
.groupBy(0).aggregate(Aggregations.SUM, 1)
.join(tarIncMat).where(0).equalTo(1).with(new
V3_TarIncWithV2()).name("V3")
.map(new DampingMapper(c, numEdges));
DataSet<Tuple2<Long, Double>> convergedVector = iteration.closeWith(
new_edgeScores,
new_edgeScores.join(iteration).where(0).equalTo(0).filter(new
EpsilonFilter()));
public static final class EpsilonFilter extends
FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>>
{
@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long,
Double>> value) {
return Math.abs(value.f0.f1  value.f1.f1) > EPSILON;
}
}
Thanks,
Janani
On Mon, Jul 7, 2014 at 12:13 PM, Stephan Ewen <sewen@apache.org> wrote:
> Hi Janani!
>
> Can you give us the code of the other variant, for comparison?
>
> Also, some quick questions to help figuring this out:
>
> 1) Your code uses an extra join for the convergence check. Does other
> variant you talk about also introduce an additional join?
>
> 2) The join you introduce, does it increase the data volume (find multiple
> matches partners per record) such that more records are emitted?
>
> Stephan
>
