flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davis k <forclazzproje...@gmail.com>
Subject Re: Error joining with Python API
Date Tue, 23 Aug 2016 04:22:02 GMT
Awesome, thanks Chesnay!

On Wed, Aug 17, 2016 at 2:58 AM, Chesnay Schepler <chesnay@apache.org>
wrote:

> Found the issue, there was a missing tab in the chaining method...
>
>
> On 16.08.2016 12:12, Chesnay Schepler wrote:
>
>> looks like a bug, will look into it. :)
>>
>> On 16.08.2016 10:29, Ufuk Celebi wrote:
>>
>>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
>>> originally contributed the Python API. He can probably tell whether
>>> this is a bug in the Python API or Flink ioperator side of things. ;)
>>>
>>> On Mon, Aug 15, 2016 at 10:14 PM, davis k <forclazzprojects@gmail.com>
>>> wrote:
>>>
>>>> I've got an issue performing joins using Python API in flink-1.1.1. With
>>>> this example code get an NPE (below). However, the NPE disappears when
>>>> the
>>>> filter is removed. Is there an error I'm making in this brief example
>>>> or is
>>>> this a Flink bug?
>>>>
>>>>
>>>>
>>>>      env = get_environment()
>>>>      env.set_parallelism(1)
>>>>
>>>>      input1 = env.from_elements("1|0","1|2") \
>>>>          .map(lambda x: x.split("|"))
>>>>
>>>>      input2 = env.from_elements("1|b") \
>>>>          .map(lambda x: x.split("|")) \
>>>>          .filter(lambda x: x[0] != "0")
>>>>
>>>>
>>>>      joined = input1 \
>>>>          .join(input2) \
>>>>          .where(0) \
>>>>          .equal_to(0) \
>>>>          .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>>>
>>>>      env.execute(local=True)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------
>>>>   The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method
>>>> caused an error.
>>>>          at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>>>
>>>>          at
>>>> org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>          at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>>>
>>>>          at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>>>
>>>>          at org.apache.flink.client.CliFrontend.run(CliFrontend.java:
>>>> 253)
>>>>          at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>>>
>>>>          at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
>>>> 1048)
>>>> Caused by: java.lang.NullPointerException
>>>>          at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:64)
>>>>          at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:59)
>>>>          at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:55)
>>>>          at
>>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperato
>>>> rSets.<init>(JoinOperator.java:850)
>>>>          at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOpera
>>>> tion(PythonPlanBinder.java:591)
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>>>
>>>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>          at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>
>>>>          at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>
>>>>          at java.lang.reflect.Method.invoke(Method.java:497)
>>>>          at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>>>
>>>>          ... 6 more
>>>>
>>>
>>
>>
>

Mime
View raw message