flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Help with Flink experimental Table API
Date Tue, 16 Jun 2015 08:27:38 GMT
Yes, what I meant was to have a single bit mask that is written before all
the fields are written. Then, for example, 1011 would mean that field 1, 2,
and 4 are non-null while field 3 is null.

On Tue, 16 Jun 2015 at 10:24 Shiti Saxena <ssaxena.ece@gmail.com> wrote:

> Can we use 0(false) and 1(true)?
>
> On Tue, Jun 16, 2015 at 1:32 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> One more thing, it would be good if the TupleSerializer didn't write a
>> boolean for every field. A single integer could be used where one bit
>> specifies if a given field is null or not. (Maybe we should also add this
>> to the RowSerializer in the future.)
>>
>> On Tue, 16 Jun 2015 at 07:30 Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> I think you can work on it. By the way, there are actually two
>>> serializers. For Scala, CaseClassSerializer is responsible for tuples as
>>> well. In Java, TupleSerializer is responsible for, well, Tuples.
>>>
>>> On Tue, 16 Jun 2015 at 06:25 Shiti Saxena <ssaxena.ece@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can I work on the issue with TupleSerializer or is someone working on
>>>> it?
>>>>
>>>> On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <aljoscha@apache.org
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>> the reason why this doesn't work is that the TupleSerializer cannot
>>>>> deal with null values:
>>>>>
>>>>> @Test
>>>>> def testAggregationWithNull(): Unit = {
>>>>>
>>>>>  val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>  val table = env.fromElements[(Integer, String)](
>>>>>  (123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable
>>>>>
>>>>>  val total = table.select('_1.sum).collect().head.productElement(0)
>>>>>  assertEquals(total, 702)
>>>>> }
>>>>>
>>>>> it would have to modified in a similar way to the PojoSerializer and
RowSerializer. You could either leave the tests as they are now in you pull request or also
modify the TupleSerializer. Both seem fine to me.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <ssaxena.ece@gmail.com>
wrote:
>>>>>
>>>>> Hi,
>>>>>>
>>>>>> Re-writing the test in the following manner works. But I am not sure
>>>>>> if this is the correct way.
>>>>>>
>>>>>> def testAggregationWithNull(): Unit = {
>>>>>>
>>>>>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>>     val dataSet = env.fromElements[(Integer, String)]((123, "a"),
>>>>>> (234, "b"), (345, "c"), (0, "d"))
>>>>>>
>>>>>>     implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
>>>>>>       Seq(BasicTypeInfo.INT_TYPE_INFO,
>>>>>> BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
>>>>>>
>>>>>>     val rowDataSet = dataSet.map {
>>>>>>       entry =>
>>>>>>         val row = new Row(2)
>>>>>>         val amount = if(entry._1<100) null else entry._1
>>>>>>         row.setField(0, amount)
>>>>>>         row.setField(1, entry._2)
>>>>>>         row
>>>>>>     }
>>>>>>
>>>>>>     val total =
>>>>>> rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
>>>>>>     assertEquals(total, 702)
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <ssaxena.ece@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> For
>>>>>>>
>>>>>>> val table = env.fromElements[(Integer, String)]((123, "a"), (234,
>>>>>>> "b"), (345, "c"), (null, "d")).toTable
>>>>>>>
>>>>>>> I get the following error,
>>>>>>>
>>>>>>> Error translating node 'Data Source "at
>>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
>>>>>>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE
[[
>>>>>>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
>>>>>>> [ordering=null, grouped=null, unique=null] ]]': null
>>>>>>> org.apache.flink.optimizer.CompilerException: Error translating
node
>>>>>>> 'Data Source "at
>>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
>>>>>>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE
[[
>>>>>>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
>>>>>>> [ordering=null, grouped=null, unique=null] ]]': null
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
>>>>>>> at
>>>>>>> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>>>>>>> at
>>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>>>>>>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>>>>>>> at
>>>>>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> at
>>>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>>>>>> at
>>>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>>>>> at
>>>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>>>>>> at
>>>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>>>>> at
>>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>>>>> at
>>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>>>>> at
>>>>>>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>>>>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>>>>>> at
>>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>>>>>> at
>>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>>>>>> at org.junit.runners.Suite.runChild(Suite.java:127)
>>>>>>> at org.junit.runners.Suite.runChild(Suite.java:26)
>>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>>>>>> at
>>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>>>>> at
>>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>>>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>>>>>>> at
>>>>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>>>>>>> at
>>>>>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
>>>>>>> at
>>>>>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> at
>>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>>>>> Caused by: java.lang.NullPointerException
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>>>>>> at
>>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
>>>>>>> at
>>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
>>>>>>> at org.apache.flink.api.java.io
>>>>>>> .CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> at
>>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
>>>>>>> at
>>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
>>>>>>> ... 55 more
>>>>>>>
>>>>>>>
>>>>>>> Does this mean that the collect method is being called before
doing
>>>>>>> the aggregation? Is this because base serializers do not handle
null values
>>>>>>> like POJOSerializer? And is that why fromCollection does not
support
>>>>>>> collections with null values?
>>>>>>>
>>>>>>> Or I could write the test using a file load if thats alright.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <
>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> sorry, my mail client sent before I was done.
>>>>>>>>
>>>>>>>> I think the problem is that the Scala compiler derives a
wrong type
>>>>>>>> for this statement:
>>>>>>>> val table = env.fromElements((123, "a"), (234, "b"), (345,
"c"),
>>>>>>>> (null, "d")).toTable
>>>>>>>>
>>>>>>>> Because of the null value it derives (Any, String) as the
type if
>>>>>>>> you do it like this, I think it should work:
>>>>>>>> val table = env.fromElements[(Integer, String)]((123, "a"),
(234,
>>>>>>>> "b"), (345, "c"), (null, "d")).toTable
>>>>>>>>
>>>>>>>> I used Integer instead of Int because Scala will complain
that null
>>>>>>>> is not a valid value for Int otherwise.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <aljoscha@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I think the problem is that the Scala compiler derives
a wrong
>>>>>>>>> type for this statement:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <ssaxena.ece@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>
>>>>>>>>>> I created the issue FLINK-2210
>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-2210>
for aggregate
>>>>>>>>>> on null. I made changes to ExpressionAggregateFunction
to handle ignore
>>>>>>>>>> null values. But I am unable to create a Table with
null values in tests.
>>>>>>>>>>
>>>>>>>>>> The code I used is,
>>>>>>>>>>
>>>>>>>>>> def testAggregationWithNull(): Unit = {
>>>>>>>>>>
>>>>>>>>>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>     val table = env.fromElements((123, "a"), (234,
"b"), (345,
>>>>>>>>>> "c"), (null, "d")).toTable
>>>>>>>>>>
>>>>>>>>>>     val total =
>>>>>>>>>> table.select('_1.sum).collect().head.productElement(0)
>>>>>>>>>>     assertEquals(total, 702)
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> and the error i get is,
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.api.table.ExpressionException: Invalid
>>>>>>>>>> expression "('_1).sum": Unsupported type GenericType<java.lang.Object>
for
>>>>>>>>>> aggregation ('_1).sum. Only numeric data types supported.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
>>>>>>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>>>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>>>>>>>>> at org.apache.flink.api.table.Table.select(Table.scala:59)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>>>>>>> at
>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>>>>> at
>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>>>>>>>>> at
>>>>>>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>>>>>>>>> at
>>>>>>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>>>>>>>> at
>>>>>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>>>>>>>> at
>>>>>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>>>>>>>> at
>>>>>>>>>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>>>>>>>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>>>>>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>>>>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>>>>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>>>>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>>>>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>>>>>>>>> at org.junit.runners.Suite.runChild(Suite.java:127)
>>>>>>>>>> at org.junit.runners.Suite.runChild(Suite.java:26)
>>>>>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>>>>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>>>>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>>>>>>>>> at
>>>>>>>>>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>>>>>>>>> at
>>>>>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>>>>>>>> at
>>>>>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>>>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>>>>>>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>>>>>>>>>> at
>>>>>>>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>>>>>>>>>> at
>>>>>>>>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
>>>>>>>>>> at
>>>>>>>>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>>>>>>> at
>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>>>>> at
>>>>>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The ExecutionEnvironment.fromCollection method also
throws an
>>>>>>>>>> error when the collection contains a null.
>>>>>>>>>>
>>>>>>>>>> Could you please point out what I am doing wrong?
How do we
>>>>>>>>>> create a Table with null values?
>>>>>>>>>>
>>>>>>>>>> In our application, we load a file and transform
each line into a
>>>>>>>>>> Row resulting in a DataSet[Row]. This DataSet[Row]
is then converted into
>>>>>>>>>> Table. Should I use the same approach for the test
case?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Shiti
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <
>>>>>>>>>> ssaxena.ece@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'll do the fix
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek
<
>>>>>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I merged your PR for the RowSerializer. Teaching
the
>>>>>>>>>>>> aggregators to deal with null values should
be a very simple fix in
>>>>>>>>>>>> ExpressionAggregateFunction.scala. There
it is simply always aggregating
>>>>>>>>>>>> the values without checking whether they
are null. If you want you can also
>>>>>>>>>>>> fix that or I can quickly fix it.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek
<
>>>>>>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Cool, good to hear.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The PojoSerializer already handles null
fields. The
>>>>>>>>>>>>> RowSerializer can be modified in pretty
much the same way. So you should
>>>>>>>>>>>>> start by looking at the copy()/serialize()/deserialize()
methods of
>>>>>>>>>>>>> PojoSerializer and then modify RowSerializer
in a similar way.
>>>>>>>>>>>>>
>>>>>>>>>>>>> You can also send me a private mail if
you want more in-depth
>>>>>>>>>>>>> explanations.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, 11 Jun 2015 at 09:33 Till Rohrmann
<
>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Shiti,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> here is the issue [1].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2203
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 11, 2015 at 8:42 AM Shiti
Saxena <
>>>>>>>>>>>>>> ssaxena.ece@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Could you please point me to
the JIRA tickets? If you could
>>>>>>>>>>>>>>> provide some guidance on how
to resolve these, I will work on them and
>>>>>>>>>>>>>>> raise a pull-request.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Shiti
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jun 11, 2015 at 11:31
AM, Aljoscha Krettek <
>>>>>>>>>>>>>>> aljoscha@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> yes, I think the problem
is that the RowSerializer does not
>>>>>>>>>>>>>>>> support null-values. I think
we can add support for this, I will open a
>>>>>>>>>>>>>>>> Jira issue.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Another problem I then see
is that the aggregations can not
>>>>>>>>>>>>>>>> properly deal with null-values.
This would need separate support.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, 11 Jun 2015 at 06:41
Shiti Saxena <
>>>>>>>>>>>>>>>> ssaxena.ece@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In our project, we are
using the Flink Table API and are
>>>>>>>>>>>>>>>>> facing the following
issues,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We load data from a CSV
file and create a DataSet[Row].
>>>>>>>>>>>>>>>>> The CSV file can also
have invalid entries in some of the fields which we
>>>>>>>>>>>>>>>>> replace with null when
building the DataSet[Row].
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This DataSet[Row] is
later on transformed to Table
>>>>>>>>>>>>>>>>> whenever required and
specific operation such as select or aggregate, etc
>>>>>>>>>>>>>>>>> are performed.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When a null value is
encountered, we get a null pointer
>>>>>>>>>>>>>>>>> exception and the whole
job fails. (We can see this by calling collect on
>>>>>>>>>>>>>>>>> the resulting DataSet).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The error message is
similar to,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Job execution failed.
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.client.JobExecutionException:
Job
>>>>>>>>>>>>>>>>> execution failed.
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>>>>>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
>>>>>>>>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>>>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>>>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>>>>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>>>>>>>>> Caused by: java.lang.NullPointerException
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>>>>>>>>>>>>>>>>> at org.apache.flink.runtime.io
>>>>>>>>>>>>>>>>> .network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>>>>>>>>>>>>>>> at org.apache.flink.runtime.io
>>>>>>>>>>>>>>>>> .network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Could this be because
the RowSerializer does not support
>>>>>>>>>>>>>>>>> null values? (Similar
to Flink-629
>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-629>
)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Currently, to overcome
this issue, we are ignoring all the
>>>>>>>>>>>>>>>>> rows which may have null
values. For example, we have a method cleanData
>>>>>>>>>>>>>>>>> defined as,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> def cleanData(table:Table,
>>>>>>>>>>>>>>>>> relevantColumns:Seq[String]):Table
= {
>>>>>>>>>>>>>>>>>     val whereClause:
String = relevantColumns.map{
>>>>>>>>>>>>>>>>>         cName=>
>>>>>>>>>>>>>>>>>             s"$cName.isNotNull"
>>>>>>>>>>>>>>>>>     }.mkString(" &&
")
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     val result :Table
=
>>>>>>>>>>>>>>>>> table.select(relevantColumns.mkString(",")).where(whereClause)
>>>>>>>>>>>>>>>>>     result
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Before operating on any
Table, we use this method and then
>>>>>>>>>>>>>>>>> continue with task.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is this the right way
to handle this? If not please let me
>>>>>>>>>>>>>>>>> know how to go about
it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Shiti
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>

Mime
View raw message