flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: IDE complains my Filter operator in Scala
Date Tue, 10 Mar 2015 23:19:33 GMT
Hi Hung,

the problem is that the data set pIdIndex contains elements of type
Tuple2[String, Int] whereas the defined filter function expects
AnnotatedVertex elements. Removing the trailing map operation where you
read the vertices should solve this problem.

Another problem is that in the open method of the RichFilterFunction you
call map on a Java list. The method getBroadcastVariable returns a
java.util.List object which does not support this method. You can easily
solve this problem by importing
scala.collection.JavaConverters.asScalaBufferConverter in the open method
scope and then call
getRuntimeContext.getBroadcastVariable[Tuple1[Int]]("vertices").asScala.map
{ _._1 }.toSet

If you should encounter other problems, then let us know.

Greets,

Till

On Tue, Mar 10, 2015 at 5:00 PM, HungChang <unicorn.banachi@gmail.com>
wrote:

> Hi,
>
> My Filter operator in Scala encounters that IDE complains about the data
> type:
> "overloaded method value filter with alternatives: (fun: ((String, Int)) ⇒
> Boolean cannot be applied..."
>
> Scala for me is quite new. I'm thinking the problem comes from the type
> doesn't match each other in map operator and filter operator:
>
> val pldIndex = GraphUtils.readVertices(PLDIndexFile).map { vertex =>
> Tuple2(vertex.annotation, vertex.id) }
>
> class PruneToSampleVerticesFilter extends
> RichFilterFunction[AnnotatedVertex]
>
> Did you have any suggestion that in which direction should I work on to
> solve this?
>
> Best regards,
>
> Hung
>
>
> ----------------------------------------------------------------------------------------------------------
> def compute(trackingGraphFile: String, PLDIndexFile: String, outputPath:
> String) = {
>
>     implicit val env = ExecutionEnvironment.getExecutionEnvironment
>
>     // Vertex => VertexName, VertexID
>     val pldIndex = GraphUtils.readVertices(PLDIndexFile).map { vertex =>
> Tuple2(vertex.annotation, vertex.id) }
>     val trackingGraphEdges = GraphUtils.readEdges(trackingGraphFile)
>
>     val trackingGraphVertices = trackingGraphEdges.map { edge =>
> Tuple1(edge.target) }
>                                                   .distinct
>
> // IDE complains this filter
>     val prunedPldIndex = pldIndex.filter(new PruneToSampleVerticesFilter)
>
> .withBroadcastSet(trackingGraphVertices, "vertices")
>
>     prunedPldIndex.writeAsCsv(outputPath, fieldDelimiter = "\t", writeMode
> =
> WriteMode.OVERWRITE)
>
>     env.execute()
>   }
>
>   class PruneToSampleVerticesFilter extends
> RichFilterFunction[AnnotatedVertex] {
>
>     var vertices: Set[Int] = null
>
>     override def open(parameters: Configuration) = {
>       vertices =
> getRuntimeContext.getBroadcastVariable[Tuple1[Int]]("vertices").map { _._1
> }
>
> .toSet
>     }
>
>     override def filter(vertex: AnnotatedVertex): Boolean = {
>       vertices.contains(vertex.id)
>     }
>   }
>
>
> ------------------------------------------------------------------------------------------------------------------
> // case class
> case class AnnotatedVertex(annotation: String, id: Int)
> case class Edge(src: Int, target: Int)
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/IDE-complains-my-Filter-operator-in-Scala-tp820.html
> Sent from the Apache Flink (Incubator) User Mailing List archive. mailing
> list archive at Nabble.com.
>

Mime
View raw message