flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack David Galilee <jgal2...@uni.sydney.edu.au>
Subject Delta Iteration Obstuse Error Message
Date Wed, 27 Aug 2014 09:10:38 GMT
Hi Everyone,


I had an epiphany while reading through the Flink 0.6 API documentation and decided to try
a new method for my iterative algorithm, but it just results in a weirder error. I've also
included the error I was getting for the suggestion that was posted earlier.


I'm sorry for not being able to provide full source code. If it is any help all of my functions
now produce Tuple2<String, String>(); Where the initial dataset is also Tuple2<String,String>.
The goal is to write out the union of the results from all iterations where intersection of
the set of keys for iteration i and iteration i - 1 is the empty set.


        DeltaIteration<Tuple2<String, String>, Tuple2<String, String>> iteration
= transactions.
                iterateDelta(initial, maxIterations, 0);


        DataSet<Tuple2<String, String>> ... = ....
                flatMap(new ...()).withBroadcastSet(iteration.getWorkset(), "..."). // <--
Referencing the working set
                groupBy(0).
                reduceGroup(new ...()).
                withParameters(intValue).
                join(iteration.getSolutionSet()).where(0).equalTo(0).getInput1(); // <--
Referencing the solution set
//             projectFirst(1).projectSecond(1).types(String.class, String.class);


Raises Exception. If I change it to get input2(), I get the same error, but for the working
set which is referenced through the broadcast.


Exception in thread "main" org.apache.flink.compiler.CompilerException: Error: The step function
does not reference the solution set.
    at org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:856)
    at org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:616)
    at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283)
    at org.apache.flink.api.common.operators.base.GenericDataSinkBase.accept(GenericDataSinkBase.java:289)


If I remove the getInput1() call and uncomment that last line it yields. I was concerned that
I was accidentally writing out a null value

somewhere but I can't find out.


Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.NullPointerException
    at org.apache.flink.api.java.operators.JoinOperator$ProjectFlatJoinFunction.join(JoinOperator.java:935)
    at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:143)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
    at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
    at org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
    at java.lang.Thread.run(Thread.java:744)


After more investigation it appears that the null pointer exists somewhere between the the
reduceGroup operator and next mapOperator as the next mapOperator does not run after the reduceGroup.



Thanks,

Jack

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message