flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Invalid shipping strategy for OutputEmitter: PARTITION_FORCED_REBALANCE
Date Tue, 25 Nov 2014 09:55:49 GMT
Yes because Stefano is working on the stable version..I saw that you are
going to release the 7.1 version, do you think you can include also the new
HBase addon (that generates Tuples..)?

On Tue, Nov 25, 2014 at 10:31 AM, Stefano Bortoli <s.bortoli@gmail.com>
wrote:

> Yes, I am using the record data type. I can move the implementation to the
> Tuple if that is what is needed.
>
> Thanks for the tip! :-)
>
> saluti,
> Stefano
>
> 2014-11-25 10:29 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>
>> I just had a look at this.
>>
>> Are you using the "Record" data type? That one's tools seem not to
>> support this right now, but it is an easy fix...
>> Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <s.bortoli@gmail.com>:
>>
>> Hi,
>>>
>>> I am trying to run this code:
>>>
>>> public static void main(String[] args) throws Exception {
>>>         ExecutionEnvironment env = ExecutionEnvironment
>>>                 .getExecutionEnvironment();
>>>
>>>         MyTableInputFormat inputFormat = new MyTableInputFormat();
>>>
>>>         DataSource<Record> dataset = env.createInput(inputFormat);
>>>
>>>         DataSet<Tuple4<StringValue, StringValue, StringValue,
>>> BooleanValue>> candidates = dataset
>>>                 .filter(new EmptyEntityFilterFunction()).rebalance()
>>>                 .flatMap(new FindCandidateWithMatchFlagMapFunction<>());
>>>
>>>         DataSet<Tuple3<StringValue, StringValue, StringValue>>
>>> duplicates = candidates
>>>                 .filter(new
>>> SingleMatchFilterFunctionWithFlagMatch<>()).map(
>>>                         new MapToTuple3MapFunction<>());
>>>
>>>         DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint
=
>>> duplicates
>>>                 .distinct(0, 1)
>>>                 .groupBy(0)
>>>                 .reduceGroup(
>>>                         new
>>> ConsolidateByTypeDuplicatesGroupReduceFunction());
>>>
>>>         duplicatesToprint.writeAsText("file:///tmp/"
>>>                 + EnsMaintenanceConstants.WORKING_TABLE + "/",
>>>                 WriteMode.OVERWRITE);
>>>
>>>         env.execute();
>>>     }
>>>
>>> but it fails right away with this exception. In the API it is written
>>> that rebalance can be used as input of map functions. It is not clear to me
>>> what I am doing wrong, unless rebalancing is actually illegal. In this
>>> case, it should not be available as API I guess :-)
>>>
>>> please let me know how I could use rebalance.
>>>
>>> Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource
>>> (org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter
>>> (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to
>>> slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE):
>>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>>> caused an error: Invalid shipping strategy for OutputEmitter:
>>> PARTITION_FORCED_REBALANCE
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92)
>>> at
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180)
>>> at
>>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594)
>>> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
>>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
>>> Caused by: java.lang.IllegalArgumentException: Invalid shipping strategy
>>> for OutputEmitter: PARTITION_FORCED_REBALANCE
>>> at
>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99)
>>> at
>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69)
>>> at
>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90)
>>> ... 7 more
>>>
>>
>

Mime
View raw message