flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sergio Ramírez <srami...@correo.ugr.es>
Subject Re: RichMapPartitionFunction - problems with collect
Date Wed, 06 Apr 2016 15:01:20 GMT
Hello,

Ok, please find enclosed the test code and the input data.

Cheers

On 31/03/16 10:07, Till Rohrmann wrote:
> Hi Sergio,
>
> could you please provide a complete example (including input data) to
> reproduce your problem. It is hard to tell what's going wrong when one only
> sees a fraction of the program.
>
> Cheers,
> Till
>
> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <sramirez@correo.ugr.es>
> wrote:
>
>> Hi again,
>>
>> I've not been able to solve the problem with the instruction you gave me.
>> I've tried with static variables (matrices) also unsuccessfully. I've also
>> tried this simpler code:
>>
>>
>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>> Collector[((Int, Int), Int)]): Unit = {
>>            val index = getRuntimeContext().getIndexOfThisSubtask() //
>> Partition index
>>            var ninst = 0
>>            for(reg <- it.asScala) {
>>              requireByteValues(reg.vector)
>>              ninst += 1
>>            }
>>            for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>>          }
>>
>> The result is as follows:
>>
>> Attribute 10, first seven partitions:
>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>> Attribute 12, first seven partitions:
>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>
>> As you can see, for example, for block 6 different number of instances are
>> shown, but  it's impossible.
>>
>>
>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>
>>> Haven't looked to deeply into this, but this sounds like object reuse is
>>> enabled, at which point buffering values effectively causes you to store
>>> the same value multiple times.
>>>
>>> can you try disabling objectReuse using
>>> env.getConfig().disableObjectReuse() ?
>>>
>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've been having some problems with RichMapPartitionFunction. Firstly, I
>>>> tried to convert the iterable into an array unsuccessfully. Then, I have
>>>> used some buffers to store the values per column. I am trying to  transpose
>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>
>>>> None of these solutions have worked. For example, for partition 7 and
>>>> feature 10, the vector is empty, whereas for the same partition and feature
>>>> 11, the vectors contains 200 elements. And this change on each execution,
>>>> different partitions and features.
>>>>
>>>> I think there is a problem with using the collect method out of the
>>>> iterable loop.
>>>>
>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]()
>>>> {
>>>>          def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>            val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>            val mat = for (i <- 0 until nFeatures) yield new
>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>            for(reg <- it.asScala) {
>>>>              for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>> reg.vector(i).toByte
>>>>              mat(nFeatures - 1) += classMap(reg.label)
>>>>            }
>>>>            for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>> mat(i).toArray) // numPartitions
>>>>          }
>>>>   }
>>>>
>>>> Regards
>>>>
>>>>


Mime
View raw message