flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: RichMapPartitionFunction - problems with collect
Date Thu, 31 Mar 2016 08:07:48 GMT
Hi Sergio,

could you please provide a complete example (including input data) to
reproduce your problem. It is hard to tell what's going wrong when one only
sees a fraction of the program.

Cheers,
Till

On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <sramirez@correo.ugr.es>
wrote:

> Hi again,
>
> I've not been able to solve the problem with the instruction you gave me.
> I've tried with static variables (matrices) also unsuccessfully. I've also
> tried this simpler code:
>
>
> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
> Collector[((Int, Int), Int)]): Unit = {
>           val index = getRuntimeContext().getIndexOfThisSubtask() //
> Partition index
>           var ninst = 0
>           for(reg <- it.asScala) {
>             requireByteValues(reg.vector)
>             ninst += 1
>           }
>           for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>         }
>
> The result is as follows:
>
> Attribute 10, first seven partitions:
> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
> Attribute 12, first seven partitions:
> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>
> As you can see, for example, for block 6 different number of instances are
> shown, but  it's impossible.
>
>
> On 24/03/16 22:39, Chesnay Schepler wrote:
>
>> Haven't looked to deeply into this, but this sounds like object reuse is
>> enabled, at which point buffering values effectively causes you to store
>> the same value multiple times.
>>
>> can you try disabling objectReuse using
>> env.getConfig().disableObjectReuse() ?
>>
>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>
>>> Hi all,
>>>
>>> I've been having some problems with RichMapPartitionFunction. Firstly, I
>>> tried to convert the iterable into an array unsuccessfully. Then, I have
>>> used some buffers to store the values per column. I am trying to  transpose
>>> the local matrix of LabeledVectors that I have in each partition.
>>>
>>> None of these solutions have worked. For example, for partition 7 and
>>> feature 10, the vector is empty, whereas for the same partition and feature
>>> 11, the vectors contains 200 elements. And this change on each execution,
>>> different partitions and features.
>>>
>>> I think there is a problem with using the collect method out of the
>>> iterable loop.
>>>
>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]()
>>> {
>>>         def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>           val index = getRuntimeContext().getIndexOfThisSubtask()
>>>           val mat = for (i <- 0 until nFeatures) yield new
>>> scala.collection.mutable.ListBuffer[Byte]
>>>           for(reg <- it.asScala) {
>>>             for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>> reg.vector(i).toByte
>>>             mat(nFeatures - 1) += classMap(reg.label)
>>>           }
>>>           for(i <- 0 until nFeatures) out.collect((i, index) ->
>>> mat(i).toArray) // numPartitions
>>>         }
>>>  }
>>>
>>> Regards
>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message