flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sergio Ramírez <srami...@correo.ugr.es>
Subject RichMapPartitionFunction - problems with collect
Date Tue, 22 Mar 2016 15:53:30 GMT
Hi all,

I've been having some problems with RichMapPartitionFunction. Firstly, I 
tried to convert the iterable into an array unsuccessfully. Then, I have 
used some buffers to store the values per column. I am trying to  
transpose the local matrix of LabeledVectors that I have in each partition.

None of these solutions have worked. For example, for partition 7 and 
feature 10, the vector is empty, whereas for the same partition and 
feature 11, the vectors contains 200 elements. And this change on each 
execution, different partitions and features.

I think there is a problem with using the collect method out of the 
iterable loop.

new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]() {
         def mapPartition(it: java.lang.Iterable[LabeledVector], out: 
Collector[((Int, Int), Array[Byte])]): Unit = {
           val index = getRuntimeContext().getIndexOfThisSubtask()
           val mat = for (i <- 0 until nFeatures) yield new 
scala.collection.mutable.ListBuffer[Byte]
           for(reg <- it.asScala) {
             for (i <- 0 until (nFeatures - 1)) mat(i) += 
reg.vector(i).toByte
             mat(nFeatures - 1) += classMap(reg.label)
           }
           for(i <- 0 until nFeatures) out.collect((i, index) -> 
mat(i).toArray) // numPartitions
         }
  }

Regards

Mime
View raw message