flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Baghino <stefano.bagh...@radicalbit.io>
Subject Re: Map from Tuple to Case Class
Date Wed, 04 May 2016 13:42:40 GMT
The only real noise I see is the usage of a MapFunction, which can be
rewritten like this in Scala:

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).map[Ranking] {
    (value: (LongWritable, Text)) = {
      val Array(name, n, m) = value._2.toString.split(",")
      Ranking(name, n.toInt, m.toInt) // no new needed for case classes
    }
  })

As you may have noticed, I've also destructured the tuple in the first
line. Another way to do this destructuring in a more concise way is to use
an API extension [1] (which won't be available before 1.1, I suppose).

Since you're parsing textual date, it could also possibly make sense to
handle error conditions for malformed inputs; here is an example that uses
flatMap to do so:

import scala.util.{Try, Success, Failure} // needed to work with the
"functional" Try
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).flatMap[Ranking] {
    (value: (LongWritable, Text), out: Collector[Ranking]) = {
      Try {
        val Array(name, n, m) = value._2.toString.split(",") // exception
thrown if array size != 3
        Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are
not numbers
      } match {
        case Success(ranking) => ranking
        case Failure(exception) => // deal with malformed input, perhaps log
      }
    }
  })

Feel free to ask me for any kind of clarifications on the snippets [2] I
posted, I'll gladly help you further if you need it.

Last note: I'm not a user but I believe Shapeless has some very handy
constructs to move back and forth between tuples and case classes (but
please take this with a grain of salt).

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions
[2]: I didn't test them, so caution is advisable ;)

On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <ro.schmidtke@gmail.com>
wrote:

> Hi everyone,
>
> first up, I'm new to Scala, so please bear with me, but I could not find
> any solution on the web or the Flink documentation. I'm having trouble
> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
> class. I got it to work, however in a way that I feel is too verbose for
> Scala:
>
>
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.scala._
>
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.io.Text
>
> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
> val rankingsInput: DataSet[Ranking] =
>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
> Ranking] {
>   override def map(value: (LongWritable, Text)) = {
>       val splits = value._2.toString.split(",")
>       new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
>     }
>   })
>
>
> Is there a simpler way of doing this? All other variants I've tried yield
> some type information errors.
>
> Thanks in advance!
> Robert
>
> --
> My GPG Key ID: 336E2680
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Mime
View raw message