flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
Date Tue, 25 Nov 2014 09:29:10 GMT
I just had a look at this.

Are you using the "Record" data type? That one's tools seem not to support
this right now, but it is an easy fix...
Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <s.bortoli@gmail.com>:

> Hi,
>
> I am trying to run this code:
>
> public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment
>                 .getExecutionEnvironment();
>
>         MyTableInputFormat inputFormat = new MyTableInputFormat();
>
>         DataSource<Record> dataset = env.createInput(inputFormat);
>
>         DataSet<Tuple4<StringValue, StringValue, StringValue,
> BooleanValue>> candidates = dataset
>                 .filter(new EmptyEntityFilterFunction()).rebalance()
>                 .flatMap(new FindCandidateWithMatchFlagMapFunction<>());
>
>         DataSet<Tuple3<StringValue, StringValue, StringValue>> duplicates
> = candidates
>                 .filter(new
> SingleMatchFilterFunctionWithFlagMatch<>()).map(
>                         new MapToTuple3MapFunction<>());
>
>         DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint =
> duplicates
>                 .distinct(0, 1)
>                 .groupBy(0)
>                 .reduceGroup(
>                         new
> ConsolidateByTypeDuplicatesGroupReduceFunction());
>
>         duplicatesToprint.writeAsText("file:///tmp/"
>                 + EnsMaintenanceConstants.WORKING_TABLE + "/",
>                 WriteMode.OVERWRITE);
>
>         env.execute();
>     }
>
> but it fails right away with this exception. In the API it is written that
> rebalance can be used as input of map functions. It is not clear to me what
> I am doing wrong, unless rebalancing is actually illegal. In this case, it
> should not be available as API I guess :-)
>
> please let me know how I could use rebalance.
>
> Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource
> (org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter
> (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to
> slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE):
> java.lang.RuntimeException: The initialization of the DataSource's outputs
> caused an error: Invalid shipping strategy for OutputEmitter:
> PARTITION_FORCED_REBALANCE
> at
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
> Caused by: java.lang.IllegalArgumentException: Invalid shipping strategy
> for OutputEmitter: PARTITION_FORCED_REBALANCE
> at
> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99)
> at
> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69)
> at
> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58)
> at
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245)
> at
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338)
> at
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327)
> at
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90)
> ... 7 more
>

Mime
View raw message