flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sergio Ramírez <srami...@correo.ugr.es>
Subject Re: RichMapPartitionFunction - problems with collect
Date Wed, 13 Apr 2016 15:47:45 GMT
Hello again:

Any news about this problem with enriched MapPartition function?

Thank you

On 06/04/16 17:01, Sergio Ramírez wrote:
> Hello,
>
> Ok, please find enclosed the test code and the input data.
>
> Cheers
>
> On 31/03/16 10:07, Till Rohrmann wrote:
>> Hi Sergio,
>>
>> could you please provide a complete example (including input data) to
>> reproduce your problem. It is hard to tell what's going wrong when 
>> one only
>> sees a fraction of the program.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <sramirez@correo.ugr.es>
>> wrote:
>>
>>> Hi again,
>>>
>>> I've not been able to solve the problem with the instruction you 
>>> gave me.
>>> I've tried with static variables (matrices) also unsuccessfully. 
>>> I've also
>>> tried this simpler code:
>>>
>>>
>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>> Collector[((Int, Int), Int)]): Unit = {
>>>            val index = getRuntimeContext().getIndexOfThisSubtask() //
>>> Partition index
>>>            var ninst = 0
>>>            for(reg <- it.asScala) {
>>>              requireByteValues(reg.vector)
>>>              ninst += 1
>>>            }
>>>            for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
>>>          }
>>>
>>> The result is as follows:
>>>
>>> Attribute 10, first seven partitions:
>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)

>>>
>>> Attribute 12, first seven partitions:
>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)

>>>
>>>
>>> As you can see, for example, for block 6 different number of 
>>> instances are
>>> shown, but  it's impossible.
>>>
>>>
>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>
>>>> Haven't looked to deeply into this, but this sounds like object 
>>>> reuse is
>>>> enabled, at which point buffering values effectively causes you to 
>>>> store
>>>> the same value multiple times.
>>>>
>>>> can you try disabling objectReuse using
>>>> env.getConfig().disableObjectReuse() ?
>>>>
>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I've been having some problems with RichMapPartitionFunction. 
>>>>> Firstly, I
>>>>> tried to convert the iterable into an array unsuccessfully. Then, 
>>>>> I have
>>>>> used some buffers to store the values per column. I am trying to  
>>>>> transpose
>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>
>>>>> None of these solutions have worked. For example, for partition 7 and
>>>>> feature 10, the vector is empty, whereas for the same partition 
>>>>> and feature
>>>>> 11, the vectors contains 200 elements. And this change on each 
>>>>> execution,
>>>>> different partitions and features.
>>>>>
>>>>> I think there is a problem with using the collect method out of the
>>>>> iterable loop.
>>>>>
>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), 
>>>>> Array[Byte])]()
>>>>> {
>>>>>          def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>            val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>            val mat = for (i <- 0 until nFeatures) yield new
>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>            for(reg <- it.asScala) {
>>>>>              for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>> reg.vector(i).toByte
>>>>>              mat(nFeatures - 1) += classMap(reg.label)
>>>>>            }
>>>>>            for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>> mat(i).toArray) // numPartitions
>>>>>          }
>>>>>   }
>>>>>
>>>>> Regards
>>>>>
>>>>>
>


Mime
View raw message