flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Baghino <stefano.bagh...@radicalbit.io>
Subject Re: Map from Tuple to Case Class
Date Wed, 04 May 2016 14:04:31 GMT
I just noticed my snippets contains a whole lot of errors, but I'm glad
it's been helpful. :)

On Wed, May 4, 2016 at 3:59 PM, Robert Schmidtke <ro.schmidtke@gmail.com>
wrote:

> Thanks Stefano! I guess you're right, it's probably not too bad except the
> MapFunction, which I have swapped with your suggestion now. I was just a
> bit confused by the fact that I had to state so many types, where I thought
> they could be inferred automatically. I tried variations of the
> "non-explicit" MapFunction, but I must have messed up something. The Array
> matching is pretty handy as well. I'm good to go now, all works well and
> looks a bit more Scala-y now :)
>
> Robert
>
> On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
>> The only real noise I see is the usage of a MapFunction, which can be
>> rewritten like this in Scala:
>>
>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>> val rankingsInput: DataSet[Ranking] =
>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>> rankingsInputPath, job).map[Ranking] {
>>     (value: (LongWritable, Text)) = {
>>       val Array(name, n, m) = value._2.toString.split(",")
>>       Ranking(name, n.toInt, m.toInt) // no new needed for case classes
>>     }
>>   })
>>
>> As you may have noticed, I've also destructured the tuple in the first
>> line. Another way to do this destructuring in a more concise way is to use
>> an API extension [1] (which won't be available before 1.1, I suppose).
>>
>> Since you're parsing textual date, it could also possibly make sense to
>> handle error conditions for malformed inputs; here is an example that uses
>> flatMap to do so:
>>
>> import scala.util.{Try, Success, Failure} // needed to work with the
>> "functional" Try
>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>> val rankingsInput: DataSet[Ranking] =
>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>> rankingsInputPath, job).flatMap[Ranking] {
>>     (value: (LongWritable, Text), out: Collector[Ranking]) = {
>>       Try {
>>         val Array(name, n, m) = value._2.toString.split(",") // exception
>> thrown if array size != 3
>>         Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are
>> not numbers
>>       } match {
>>         case Success(ranking) => ranking
>>         case Failure(exception) => // deal with malformed input, perhaps
>> log
>>       }
>>     }
>>   })
>>
>> Feel free to ask me for any kind of clarifications on the snippets [2] I
>> posted, I'll gladly help you further if you need it.
>>
>> Last note: I'm not a user but I believe Shapeless has some very handy
>> constructs to move back and forth between tuples and case classes (but
>> please take this with a grain of salt).
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions
>> [2]: I didn't test them, so caution is advisable ;)
>>
>> On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <ro.schmidtke@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>>
>>> first up, I'm new to Scala, so please bear with me, but I could not find
>>> any solution on the web or the Flink documentation. I'm having trouble
>>> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
>>> class. I got it to work, however in a way that I feel is too verbose for
>>> Scala:
>>>
>>>
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.api.scala._
>>>
>>> import org.apache.hadoop.io.LongWritable
>>> import org.apache.hadoop.io.Text
>>>
>>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>>> val rankingsInput: DataSet[Ranking] =
>>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>>> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
>>> Ranking] {
>>>   override def map(value: (LongWritable, Text)) = {
>>>       val splits = value._2.toString.split(",")
>>>       new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
>>>     }
>>>   })
>>>
>>>
>>> Is there a simpler way of doing this? All other variants I've tried
>>> yield some type information errors.
>>>
>>> Thanks in advance!
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Mime
View raw message