flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Help with Flink experimental Table API
Date Thu, 11 Jun 2015 08:40:49 GMT
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be
modified in pretty much the same way. So you should start by looking at the
copy()/serialize()/deserialize() methods of PojoSerializer and then modify
RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations.

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Shiti,
>
> here is the issue [1].
>
> Cheers,
> Till
>
> [1] https://issues.apache.org/jira/browse/FLINK-2203
>
> On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <ssaxena.ece@gmail.com>
> wrote:
>
>> Hi Aljoscha,
>>
>> Could you please point me to the JIRA tickets? If you could provide some
>> guidance on how to resolve these, I will work on them and raise a
>> pull-request.
>>
>> Thanks,
>> Shiti
>>
>> On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Hi,
>>> yes, I think the problem is that the RowSerializer does not support
>>> null-values. I think we can add support for this, I will open a Jira issue.
>>>
>>> Another problem I then see is that the aggregations can not properly
>>> deal with null-values. This would need separate support.
>>>
>>> Regards,
>>> Aljoscha
>>>
>>> On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <ssaxena.ece@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> In our project, we are using the Flink Table API and are facing the
>>>> following issues,
>>>>
>>>> We load data from a CSV file and create a DataSet[Row]. The CSV file
>>>> can also have invalid entries in some of the fields which we replace with
>>>> null when building the DataSet[Row].
>>>>
>>>> This DataSet[Row] is later on transformed to Table whenever required
>>>> and specific operation such as select or aggregate, etc are performed.
>>>>
>>>> When a null value is encountered, we get a null pointer exception and
>>>> the whole job fails. (We can see this by calling collect on the resulting
>>>> DataSet).
>>>>
>>>> The error message is similar to,
>>>>
>>>> Job execution failed.
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>>>> failed.
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
>>>> at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>> at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>> at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>> at
>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>> at
>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>> at
>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.lang.NullPointerException
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>>> at
>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
>>>> at
>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
>>>> at
>>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>>>> at
>>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:724)
>>>>
>>>> Could this be because the RowSerializer does not support null values?
>>>> (Similar to Flink-629 <https://issues.apache.org/jira/browse/FLINK-629>
>>>>  )
>>>>
>>>> Currently, to overcome this issue, we are ignoring all the rows which
>>>> may have null values. For example, we have a method cleanData defined as,
>>>>
>>>> def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
>>>>     val whereClause: String = relevantColumns.map{
>>>>         cName=>
>>>>             s"$cName.isNotNull"
>>>>     }.mkString(" && ")
>>>>
>>>>     val result :Table =
>>>> table.select(relevantColumns.mkString(",")).where(whereClause)
>>>>     result
>>>> }
>>>>
>>>> Before operating on any Table, we use this method and then continue
>>>> with task.
>>>>
>>>> Is this the right way to handle this? If not please let me know how to
>>>> go about it.
>>>>
>>>>
>>>> Thanks,
>>>> Shiti
>>>>
>>>>
>>>>
>>>>
>>

Mime
View raw message