flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shiti Saxena <ssaxena....@gmail.com>
Subject Re: Help with Flink experimental Table API
Date Tue, 16 Jun 2015 04:25:32 GMT
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> the reason why this doesn't work is that the TupleSerializer cannot deal
> with null values:
>
> @Test
> def testAggregationWithNull(): Unit = {
>
>  val env = ExecutionEnvironment.getExecutionEnvironment
>  val table = env.fromElements[(Integer, String)](
>  (123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable
>
>  val total = table.select('_1.sum).collect().head.productElement(0)
>  assertEquals(total, 702)
> }
>
> it would have to modified in a similar way to the PojoSerializer and RowSerializer. You
could either leave the tests as they are now in you pull request or also modify the TupleSerializer.
Both seem fine to me.
>
> Cheers,
>
> Aljoscha
>
>
> On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <ssaxena.ece@gmail.com> wrote:
>
> Hi,
>>
>> Re-writing the test in the following manner works. But I am not sure if
>> this is the correct way.
>>
>> def testAggregationWithNull(): Unit = {
>>
>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>     val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234,
>> "b"), (345, "c"), (0, "d"))
>>
>>     implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
>>       Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
>> Seq("id", "name"))
>>
>>     val rowDataSet = dataSet.map {
>>       entry =>
>>         val row = new Row(2)
>>         val amount = if(entry._1<100) null else entry._1
>>         row.setField(0, amount)
>>         row.setField(1, entry._2)
>>         row
>>     }
>>
>>     val total =
>> rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
>>     assertEquals(total, 702)
>>   }
>>
>>
>>
>> On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <ssaxena.ece@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> For
>>>
>>> val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"),
>>> (345, "c"), (null, "d")).toTable
>>>
>>> I get the following error,
>>>
>>> Error translating node 'Data Source "at
>>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
>>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[
>>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
>>> [ordering=null, grouped=null, unique=null] ]]': null
>>> org.apache.flink.optimizer.CompilerException: Error translating node
>>> 'Data Source "at
>>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
>>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[
>>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
>>> [ordering=null, grouped=null, unique=null] ]]': null
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
>>> at
>>> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>> at
>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
>>> at
>>> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>>> at
>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>>> at
>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> at
>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>> at org.junit.runners.Suite.runChild(Suite.java:127)
>>> at org.junit.runners.Suite.runChild(Suite.java:26)
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>> at
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> at
>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>>> at
>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>>> at
>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
>>> at
>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>> at
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>> at
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
>>> at
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
>>> at org.apache.flink.api.java.io
>>> .CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>> at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
>>> at
>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
>>> at
>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
>>> at
>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
>>> ... 55 more
>>>
>>>
>>> Does this mean that the collect method is being called before doing the
>>> aggregation? Is this because base serializers do not handle null values
>>> like POJOSerializer? And is that why fromCollection does not support
>>> collections with null values?
>>>
>>> Or I could write the test using a file load if thats alright.
>>>
>>>
>>> On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> sorry, my mail client sent before I was done.
>>>>
>>>> I think the problem is that the Scala compiler derives a wrong type for
>>>> this statement:
>>>> val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null,
>>>> "d")).toTable
>>>>
>>>> Because of the null value it derives (Any, String) as the type if you
>>>> do it like this, I think it should work:
>>>> val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"),
>>>> (345, "c"), (null, "d")).toTable
>>>>
>>>> I used Integer instead of Int because Scala will complain that null is
>>>> not a valid value for Int otherwise.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>>
>>>> On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <aljoscha@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I think the problem is that the Scala compiler derives a wrong type
>>>>> for this statement:
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <ssaxena.ece@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> I created the issue FLINK-2210
>>>>>> <https://issues.apache.org/jira/browse/FLINK-2210> for aggregate
on
>>>>>> null. I made changes to ExpressionAggregateFunction to handle ignore
null
>>>>>> values. But I am unable to create a Table with null values in tests.
>>>>>>
>>>>>> The code I used is,
>>>>>>
>>>>>> def testAggregationWithNull(): Unit = {
>>>>>>
>>>>>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>>     val table = env.fromElements((123, "a"), (234, "b"), (345, "c"),
>>>>>> (null, "d")).toTable
>>>>>>
>>>>>>     val total = table.select('_1.sum).collect().head.productElement(0)
>>>>>>     assertEquals(total, 702)
>>>>>>   }
>>>>>>
>>>>>> and the error i get is,
>>>>>>
>>>>>> org.apache.flink.api.table.ExpressionException: Invalid expression
>>>>>> "('_1).sum": Unsupported type GenericType<java.lang.Object>
for aggregation
>>>>>> ('_1).sum. Only numeric data types supported.
>>>>>> at
>>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
>>>>>> at
>>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
>>>>>> at
>>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
>>>>>> at
>>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>> at
>>>>>> org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
>>>>>> at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
>>>>>> at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
>>>>>> at
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>> at
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>> at
>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>> at
>>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>>> at
>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>>>>> at org.apache.flink.api.table.Table.select(Table.scala:59)
>>>>>> at
>>>>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at
>>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>>>>> at
>>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>>>> at
>>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>>>>> at
>>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>>>> at
>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>>>> at
>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>>>> at
>>>>>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>>>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>>>>> at
>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>>>>> at
>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>>>>> at org.junit.runners.Suite.runChild(Suite.java:127)
>>>>>> at org.junit.runners.Suite.runChild(Suite.java:26)
>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>>>>> at
>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>>>> at
>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>>>>>> at
>>>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>>>>>> at
>>>>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
>>>>>> at
>>>>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>> at
>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>>>>
>>>>>>
>>>>>> The ExecutionEnvironment.fromCollection method also throws an error
>>>>>> when the collection contains a null.
>>>>>>
>>>>>> Could you please point out what I am doing wrong? How do we create
a
>>>>>> Table with null values?
>>>>>>
>>>>>> In our application, we load a file and transform each line into a
Row
>>>>>> resulting in a DataSet[Row]. This DataSet[Row] is then converted
into
>>>>>> Table. Should I use the same approach for the test case?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Shiti
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <ssaxena.ece@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'll do the fix
>>>>>>>
>>>>>>> On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <
>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>
>>>>>>>> I merged your PR for the RowSerializer. Teaching the aggregators
to
>>>>>>>> deal with null values should be a very simple fix in
>>>>>>>> ExpressionAggregateFunction.scala. There it is simply always
aggregating
>>>>>>>> the values without checking whether they are null. If you
want you can also
>>>>>>>> fix that or I can quickly fix it.
>>>>>>>>
>>>>>>>> On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <aljoscha@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Cool, good to hear.
>>>>>>>>>
>>>>>>>>> The PojoSerializer already handles null fields. The RowSerializer
>>>>>>>>> can be modified in pretty much the same way. So you should
start by looking
>>>>>>>>> at the copy()/serialize()/deserialize() methods of PojoSerializer
and then
>>>>>>>>> modify RowSerializer in a similar way.
>>>>>>>>>
>>>>>>>>> You can also send me a private mail if you want more
in-depth
>>>>>>>>> explanations.
>>>>>>>>>
>>>>>>>>> On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <trohrmann@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Shiti,
>>>>>>>>>>
>>>>>>>>>> here is the issue [1].
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2203
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <
>>>>>>>>>> ssaxena.ece@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>
>>>>>>>>>>> Could you please point me to the JIRA tickets?
If you could
>>>>>>>>>>> provide some guidance on how to resolve these,
I will work on them and
>>>>>>>>>>> raise a pull-request.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Shiti
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek
<
>>>>>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> yes, I think the problem is that the RowSerializer
does not
>>>>>>>>>>>> support null-values. I think we can add support
for this, I will open a
>>>>>>>>>>>> Jira issue.
>>>>>>>>>>>>
>>>>>>>>>>>> Another problem I then see is that the aggregations
can not
>>>>>>>>>>>> properly deal with null-values. This would
need separate support.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 11 Jun 2015 at 06:41 Shiti Saxena
<
>>>>>>>>>>>> ssaxena.ece@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> In our project, we are using the Flink
Table API and are
>>>>>>>>>>>>> facing the following issues,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We load data from a CSV file and create
a DataSet[Row]. The
>>>>>>>>>>>>> CSV file can also have invalid entries
in some of the fields which we
>>>>>>>>>>>>> replace with null when building the DataSet[Row].
>>>>>>>>>>>>>
>>>>>>>>>>>>> This DataSet[Row] is later on transformed
to Table whenever
>>>>>>>>>>>>> required and specific operation such
as select or aggregate, etc are
>>>>>>>>>>>>> performed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> When a null value is encountered, we
get a null pointer
>>>>>>>>>>>>> exception and the whole job fails. (We
can see this by calling collect on
>>>>>>>>>>>>> the resulting DataSet).
>>>>>>>>>>>>>
>>>>>>>>>>>>> The error message is similar to,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Job execution failed.
>>>>>>>>>>>>> org.apache.flink.runtime.client.JobExecutionException:
Job
>>>>>>>>>>>>> execution failed.
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
>>>>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>>>>> Caused by: java.lang.NullPointerException
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>>>>>>>>>>>>> at org.apache.flink.runtime.io
>>>>>>>>>>>>> .network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>>>>>>>>>>> at org.apache.flink.runtime.io
>>>>>>>>>>>>> .network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
>>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could this be because the RowSerializer
does not support null
>>>>>>>>>>>>> values? (Similar to Flink-629
>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-629>
)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently, to overcome this issue, we
are ignoring all the
>>>>>>>>>>>>> rows which may have null values. For
example, we have a method cleanData
>>>>>>>>>>>>> defined as,
>>>>>>>>>>>>>
>>>>>>>>>>>>> def cleanData(table:Table, relevantColumns:Seq[String]):Table
>>>>>>>>>>>>> = {
>>>>>>>>>>>>>     val whereClause: String = relevantColumns.map{
>>>>>>>>>>>>>         cName=>
>>>>>>>>>>>>>             s"$cName.isNotNull"
>>>>>>>>>>>>>     }.mkString(" && ")
>>>>>>>>>>>>>
>>>>>>>>>>>>>     val result :Table =
>>>>>>>>>>>>> table.select(relevantColumns.mkString(",")).where(whereClause)
>>>>>>>>>>>>>     result
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> Before operating on any Table, we use
this method and then
>>>>>>>>>>>>> continue with task.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is this the right way to handle this?
If not please let me
>>>>>>>>>>>>> know how to go about it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Shiti
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>>>
>>>
>>

Mime
View raw message