flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sergio Ramírez <srami...@correo.ugr.es>
Subject Re: RichMapPartitionFunction - problems with collect
Date Thu, 28 Apr 2016 16:14:23 GMT
Hello,

OK, now I understand everything. So if I want to re-use my DataSet in 
several different operations, what should I do? Is there any way to 
maintain the save the data from re-computation? I am not only talking 
about iteration. I mean re-use of data.

For example, imagine I want filter some results and collect, and then I 
want to apply another filter.

Regards

On 26/04/16 14:25, Till Rohrmann wrote:
> Hi Sergio,
>
> sorry for the late reply. I figured out your problem. The reason why you
> see apparently inconsistent results is that you execute your job multiple
> times. Each collect call triggers an eager execution of your Flink job.
> Since the intermediate results are not stored the whole topology has to be
> re-executed for every collect call. Since the input split assignment of
> your libSVM file happens lazily, it can happen that the different sub tasks
> get different splits of the input file assigned. Therefore, it happens that
> you see different lengths for different features of the same partition.
>
> If you replace the last 6 lines of your program with:
>
> transposed.filter(_._1._1 == (nFeatures - 1)).map(t => t._1 ->
> t._2.size).reduceGroup(_.mkString(",")).output(new
> PrintingOutputFormat())
> val b = transposed.filter(_._1._1 == 10).map(t => t._1 ->
> t._2.size).reduceGroup(_.mkString(",")).output(new
> PrintingOutputFormat())
> val c = transposed.filter(_._1._1 == 12).map(t => t._1 ->
> t._2.size).reduceGroup(_.mkString(",")).output(new
> PrintingOutputFormat())
>
> env.execute()
>
> you should see the correct results. If you need a deterministic input split
> assignment to the different sources, then you would have to implement your
> own InputFormat which returns a special InputSplitAssigner which does the
> deterministic input split assignment. Or simply try to avoid collect, count
> and print which trigger the eager execution of a Flink job.
>
> Cheers,
> Till
> ​
>
> On Wed, Apr 13, 2016 at 5:47 PM, Sergio Ramírez <sramirez@correo.ugr.es>
> wrote:
>
>> Hello again:
>>
>> Any news about this problem with enriched MapPartition function?
>>
>> Thank you
>>
>>
>> On 06/04/16 17:01, Sergio Ramírez wrote:
>>
>>> Hello,
>>>
>>> Ok, please find enclosed the test code and the input data.
>>>
>>> Cheers
>>>
>>> On 31/03/16 10:07, Till Rohrmann wrote:
>>>
>>>> Hi Sergio,
>>>>
>>>> could you please provide a complete example (including input data) to
>>>> reproduce your problem. It is hard to tell what's going wrong when one
>>>> only
>>>> sees a fraction of the program.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <sramirez@correo.ugr.es>
>>>> wrote:
>>>>
>>>> Hi again,
>>>>> I've not been able to solve the problem with the instruction you gave
>>>>> me.
>>>>> I've tried with static variables (matrices) also unsuccessfully. I've
>>>>> also
>>>>> tried this simpler code:
>>>>>
>>>>>
>>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>> Collector[((Int, Int), Int)]): Unit = {
>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask() //
>>>>> Partition index
>>>>>             var ninst = 0
>>>>>             for(reg <- it.asScala) {
>>>>>               requireByteValues(reg.vector)
>>>>>               ninst += 1
>>>>>             }
>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) ->
ninst)
>>>>>           }
>>>>>
>>>>> The result is as follows:
>>>>>
>>>>> Attribute 10, first seven partitions:
>>>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>>>>>
>>>>> Attribute 12, first seven partitions:
>>>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>>>>
>>>>>
>>>>> As you can see, for example, for block 6 different number of instances
>>>>> are
>>>>> shown, but  it's impossible.
>>>>>
>>>>>
>>>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>>>
>>>>> Haven't looked to deeply into this, but this sounds like object reuse
is
>>>>>> enabled, at which point buffering values effectively causes you to
>>>>>> store
>>>>>> the same value multiple times.
>>>>>>
>>>>>> can you try disabling objectReuse using
>>>>>> env.getConfig().disableObjectReuse() ?
>>>>>>
>>>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>> I've been having some problems with RichMapPartitionFunction.
>>>>>>> Firstly, I
>>>>>>> tried to convert the iterable into an array unsuccessfully. Then,
I
>>>>>>> have
>>>>>>> used some buffers to store the values per column. I am trying
to
>>>>>>> transpose
>>>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>>>
>>>>>>> None of these solutions have worked. For example, for partition
7 and
>>>>>>> feature 10, the vector is empty, whereas for the same partition
and
>>>>>>> feature
>>>>>>> 11, the vectors contains 200 elements. And this change on each
>>>>>>> execution,
>>>>>>> different partitions and features.
>>>>>>>
>>>>>>> I think there is a problem with using the collect method out
of the
>>>>>>> iterable loop.
>>>>>>>
>>>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>>>>>>> Array[Byte])]()
>>>>>>> {
>>>>>>>           def mapPartition(it: java.lang.Iterable[LabeledVector],
out:
>>>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>>>             val mat = for (i <- 0 until nFeatures) yield new
>>>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>>>             for(reg <- it.asScala) {
>>>>>>>               for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>>>> reg.vector(i).toByte
>>>>>>>               mat(nFeatures - 1) += classMap(reg.label)
>>>>>>>             }
>>>>>>>             for(i <- 0 until nFeatures) out.collect((i, index)
->
>>>>>>> mat(i).toArray) // numPartitions
>>>>>>>           }
>>>>>>>    }
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>>
>>>>>>>


Mime
View raw message