flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
Date Tue, 25 Nov 2014 09:59:47 GMT
Yes, makes sense to att the HBase fix to that.

On Tue, Nov 25, 2014 at 10:55 AM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Yes because Stefano is working on the stable version..I saw that you are
> going to release the 7.1 version, do you think you can include also the new
> HBase addon (that generates Tuples..)?
>
> On Tue, Nov 25, 2014 at 10:31 AM, Stefano Bortoli <s.bortoli@gmail.com>
> wrote:
>
>> Yes, I am using the record data type. I can move the implementation to
>> the Tuple if that is what is needed.
>>
>> Thanks for the tip! :-)
>>
>> saluti,
>> Stefano
>>
>> 2014-11-25 10:29 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>
>>> I just had a look at this.
>>>
>>> Are you using the "Record" data type? That one's tools seem not to
>>> support this right now, but it is an easy fix...
>>> Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <s.bortoli@gmail.com>:
>>>
>>> Hi,
>>>>
>>>> I am trying to run this code:
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>>         ExecutionEnvironment env = ExecutionEnvironment
>>>>                 .getExecutionEnvironment();
>>>>
>>>>         MyTableInputFormat inputFormat = new MyTableInputFormat();
>>>>
>>>>         DataSource<Record> dataset = env.createInput(inputFormat);
>>>>
>>>>         DataSet<Tuple4<StringValue, StringValue, StringValue,
>>>> BooleanValue>> candidates = dataset
>>>>                 .filter(new EmptyEntityFilterFunction()).rebalance()
>>>>                 .flatMap(new FindCandidateWithMatchFlagMapFunction<>());
>>>>
>>>>         DataSet<Tuple3<StringValue, StringValue, StringValue>>
>>>> duplicates = candidates
>>>>                 .filter(new
>>>> SingleMatchFilterFunctionWithFlagMatch<>()).map(
>>>>                         new MapToTuple3MapFunction<>());
>>>>
>>>>         DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint
=
>>>> duplicates
>>>>                 .distinct(0, 1)
>>>>                 .groupBy(0)
>>>>                 .reduceGroup(
>>>>                         new
>>>> ConsolidateByTypeDuplicatesGroupReduceFunction());
>>>>
>>>>         duplicatesToprint.writeAsText("file:///tmp/"
>>>>                 + EnsMaintenanceConstants.WORKING_TABLE + "/",
>>>>                 WriteMode.OVERWRITE);
>>>>
>>>>         env.execute();
>>>>     }
>>>>
>>>> but it fails right away with this exception. In the API it is written
>>>> that rebalance can be used as input of map functions. It is not clear to
me
>>>> what I am doing wrong, unless rebalancing is actually illegal. In this
>>>> case, it should not be available as API I guess :-)
>>>>
>>>> please let me know how I could use rebalance.
>>>>
>>>> Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource
>>>> (org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter
>>>> (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to
>>>> slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE):
>>>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>>>> caused an error: Invalid shipping strategy for OutputEmitter:
>>>> PARTITION_FORCED_REBALANCE
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92)
>>>> at
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
>>>> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
>>>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
>>>> Caused by: java.lang.IllegalArgumentException: Invalid shipping
>>>> strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
>>>> at
>>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99)
>>>> at
>>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69)
>>>> at
>>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58)
>>>> at
>>>> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245)
>>>> at
>>>> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90)
>>>> ... 7 more
>>>>
>>>
>>
>

Mime
View raw message