flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
Date Tue, 25 Nov 2014 19:54:44 GMT
This problem should be fixed though
https://issues.apache.org/jira/browse/FLINK-1278

On Tue, Nov 25, 2014 at 11:02 AM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Great :)
>
> On Tue, Nov 25, 2014 at 10:59 AM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Yes, makes sense to att the HBase fix to that.
>>
>> On Tue, Nov 25, 2014 at 10:55 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> Yes because Stefano is working on the stable version..I saw that you are
>>> going to release the 7.1 version, do you think you can include also the new
>>> HBase addon (that generates Tuples..)?
>>>
>>> On Tue, Nov 25, 2014 at 10:31 AM, Stefano Bortoli <s.bortoli@gmail.com>
>>> wrote:
>>>
>>>> Yes, I am using the record data type. I can move the implementation to
>>>> the Tuple if that is what is needed.
>>>>
>>>> Thanks for the tip! :-)
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2014-11-25 10:29 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>
>>>>> I just had a look at this.
>>>>>
>>>>> Are you using the "Record" data type? That one's tools seem not to
>>>>> support this right now, but it is an easy fix...
>>>>> Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <s.bortoli@gmail.com>:
>>>>>
>>>>> Hi,
>>>>>>
>>>>>> I am trying to run this code:
>>>>>>
>>>>>> public static void main(String[] args) throws Exception {
>>>>>>         ExecutionEnvironment env = ExecutionEnvironment
>>>>>>                 .getExecutionEnvironment();
>>>>>>
>>>>>>         MyTableInputFormat inputFormat = new MyTableInputFormat();
>>>>>>
>>>>>>         DataSource<Record> dataset = env.createInput(inputFormat);
>>>>>>
>>>>>>         DataSet<Tuple4<StringValue, StringValue, StringValue,
>>>>>> BooleanValue>> candidates = dataset
>>>>>>                 .filter(new EmptyEntityFilterFunction()).rebalance()
>>>>>>                 .flatMap(new
>>>>>> FindCandidateWithMatchFlagMapFunction<>());
>>>>>>
>>>>>>         DataSet<Tuple3<StringValue, StringValue, StringValue>>
>>>>>> duplicates = candidates
>>>>>>                 .filter(new
>>>>>> SingleMatchFilterFunctionWithFlagMatch<>()).map(
>>>>>>                         new MapToTuple3MapFunction<>());
>>>>>>
>>>>>>         DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint
>>>>>> = duplicates
>>>>>>                 .distinct(0, 1)
>>>>>>                 .groupBy(0)
>>>>>>                 .reduceGroup(
>>>>>>                         new
>>>>>> ConsolidateByTypeDuplicatesGroupReduceFunction());
>>>>>>
>>>>>>         duplicatesToprint.writeAsText("file:///tmp/"
>>>>>>                 + EnsMaintenanceConstants.WORKING_TABLE + "/",
>>>>>>                 WriteMode.OVERWRITE);
>>>>>>
>>>>>>         env.execute();
>>>>>>     }
>>>>>>
>>>>>> but it fails right away with this exception. In the API it is written
>>>>>> that rebalance can be used as input of map functions. It is not clear
to me
>>>>>> what I am doing wrong, unless rebalancing is actually illegal. In
this
>>>>>> case, it should not be available as API I guess :-)
>>>>>>
>>>>>> please let me know how I could use rebalance.
>>>>>>
>>>>>> Error: java.lang.Exception: Failed to deploy the task CHAIN
>>>>>> DataSource (org.okkam.flink.hbase.MyTableInputFormat@4259448a) ->
>>>>>> Filter (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) -
execution
>>>>>> #0 to slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) -
>>>>>> ALLOCATED/ALIVE): java.lang.RuntimeException: The initialization
of the
>>>>>> DataSource's outputs caused an error: Invalid shipping strategy for
>>>>>> OutputEmitter: PARTITION_FORCED_REBALANCE
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92)
>>>>>> at
>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180)
>>>>>> at
>>>>>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
>>>>>> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
>>>>>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
>>>>>> Caused by: java.lang.IllegalArgumentException: Invalid shipping
>>>>>> strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90)
>>>>>> ... 7 more
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message