flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-1254) Optimizer bug during pipeline breaker placement
Date Wed, 19 Nov 2014 11:27:33 GMT
Stephan Ewen created FLINK-1254:
-----------------------------------

             Summary: Optimizer bug during pipeline breaker placement
                 Key: FLINK-1254
                 URL: https://issues.apache.org/jira/browse/FLINK-1254
             Project: Flink
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 0.8-incubating
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 0.8-incubating


The compiler fails on certain programs when trying to place pipeline breakers.

This code reproduces the error:

{code}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);

// the workset (input two of the delta iteration) is the same as what is consumed be the successive
join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new
DuplicateValue());

DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new
DuplicateValue());

// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset,
100, 0);

DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());

DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);

initialWorkset
	.join(result, JoinHint.REPARTITION_HASH_FIRST)
	.where(0).equalTo(0)
	.print();

Plan p = env.createProgramPlan();
compileNoStats(p);
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message