flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
Date Tue, 25 Nov 2014 08:47:53 GMT
Hi,

I am trying to run this code:

public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();

        MyTableInputFormat inputFormat = new MyTableInputFormat();

        DataSource<Record> dataset = env.createInput(inputFormat);

        DataSet<Tuple4<StringValue, StringValue, StringValue,
BooleanValue>> candidates = dataset
                .filter(new EmptyEntityFilterFunction()).rebalance()
                .flatMap(new FindCandidateWithMatchFlagMapFunction<>());

        DataSet<Tuple3<StringValue, StringValue, StringValue>> duplicates =
candidates
                .filter(new SingleMatchFilterFunctionWithFlagMatch<>()).map(
                        new MapToTuple3MapFunction<>());

        DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint =
duplicates
                .distinct(0, 1)
                .groupBy(0)
                .reduceGroup(
                        new
ConsolidateByTypeDuplicatesGroupReduceFunction());

        duplicatesToprint.writeAsText("file:///tmp/"
                + EnsMaintenanceConstants.WORKING_TABLE + "/",
                WriteMode.OVERWRITE);

        env.execute();
    }

but it fails right away with this exception. In the API it is written that
rebalance can be used as input of map functions. It is not clear to me what
I am doing wrong, unless rebalancing is actually illegal. In this case, it
should not be available as API I guess :-)

please let me know how I could use rebalance.

Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource
(org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter
(org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to
slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE):
java.lang.RuntimeException: The initialization of the DataSource's outputs
caused an error: Invalid shipping strategy for OutputEmitter:
PARTITION_FORCED_REBALANCE
at
org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180)
at
org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
Caused by: java.lang.IllegalArgumentException: Invalid shipping strategy
for OutputEmitter: PARTITION_FORCED_REBALANCE
at
org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99)
at
org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69)
at
org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58)
at
org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245)
at
org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338)
at
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327)
at
org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90)
... 7 more

Mime
View raw message