flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andra Lungu <lungu.an...@gmail.com>
Subject Re: Nested iterations not supported?
Date Tue, 18 Aug 2015 10:11:09 GMT
Hello Pieter,

Nested iterations are indeed not supported in Flink.
http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E

The problem is not in your code.


On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken <
pieterjan.vanaeken@euranova.eu> wrote:

> Hello all,
>
>
> I am having some troubles getting nested iterations to work. The basic
> outline of my application looks like this :
>
> 1. create vertex dataset
> 2. create edge dataset
> 3. bulk iterate 100 times on edges {
>     3a. Create graph from nodes and edges
>     3b. Perform GatherSumApply (delta iteration)
>     3c. Map Vertices
>     3d. Perform GatherSumApply in other direction (again a delta iteration)
>     3e. Join with edges on target
>     3f. Output new edges
> }
> 4. write edges to file
>
> Am I correct in assuming that the two delta iterations (GSA) inside the
> bulk iteration are not allowed at this point in time? Or should I continue
> looking for bugs in my code? The stack trace doesn't help me all that much:
>
> Exception in thread "main" java.lang.IllegalStateException
>     at
>
> org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>     at
>
> org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
>     at
>
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>     at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>     at
>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
>     at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>     at
>
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)
>
>
> Regards,
>
> Pieter-Jan Van Aeken
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message