beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: Resampling a timeserie stuck on a GroupByKey
Date Wed, 16 Aug 2017 21:46:18 GMT
Do you have some job ids that you could share?

On Wed, Aug 16, 2017 at 1:18 PM, Tristan Marechaux <
tristan.marechaux@walnut-algo.com> wrote:

> Thanks for the invitation and for the answer.
>
> I tried the Count resample function and I still have the same issue, so I
> guess it doesn't come from my resample function, but here is the code in
> case :
>
> def resample_function(candles):
>
>     sorted_candles = sorted(filter(lambda x: x.date is not None, candles), key=lambda
candle: candle.date)
>     if len(sorted_candles) > 0:
>         return Candle(
>             sorted_candles[-1].date,
>             sorted_candles[0].open,
>             max(candle.high for candle in candles),
>             min(candle.low for candle in candles),
>             sorted_candles[-1].close,
>             sum((candle.volume for candle in candles), .0)
>         )
>
>
> The fact is that the pipeline seems stucked on the GroupByKey inside the
> CombineGlobaly PTransform before the call of my resample_function (if the
> GCP web interface is accurate).
>
> I tried the with to have in my pipeline only native python type with the
> CountCombineFn and it's still stucked.
>
> Here is what I can see on my GCP console (this screenshot shows 36 minutes
> by I waited for 5 hours to be sure) :
> [image: Selection_070.png]
>
>
> On Wed, Aug 16, 2017 at 1:08 AM Lukasz Cwik <lcwik@google.com> wrote:
>
>> I have invited you to the slack channel.
>>
>> 2 million data points doesn't seem like it should be an issue.
>> Have you considered trying a simpler combiner like Count to see if the
>> bottleneck is with the combiner that you are supplying?
>> Also, could you share the code for what resample_function does?
>>
>> On Mon, Aug 14, 2017 at 2:43 AM, Tristan Marechaux <
>> tristan.marechaux@walnut-algo.com> wrote:
>>
>>> Hi all,
>>>
>>> I wrote a Beam Pipeline written with the python SDK that resample a
>>> timeseries containing data points everery minute to a 5-minutes timeserie.
>>>
>>> My pipeline looks like:
>>> input_data | WindowInto(FixedWindows(size=timedelta(minutes=5).total_seconds()))
>>> | CombineGlobaly(resample_function)
>>>
>>> When I run it with the local or DataFlow runner with a small dataset, it
>>> works and does what I want.
>>>
>>> But when I try to run it on the DataFlow runner with a bigger dataset (1
>>> 700 000 datapoints timestamped over 15 years) it stay stuck for hours on
>>> the GroupByKey step of CombineGlobaly.
>>>
>>> My question is : Did I do something wrong with the design of my pipeline?
>>>
>>> PS: Can someone invite me to the slack channel?
>>> --
>>>
>>> Tristan Marechaux
>>>
>>> Data Scientist | *Walnut Algorithms*
>>>
>>> Mobile : +33 627804399 <+33627804399>
>>>
>>> Email: tristan.marechaux@walnut-algo.com
>>>
>>> Web: www.walnutalgorithms.com
>>>
>>
>> --
>
> Tristan Marechaux
>
> Data Scientist | *Walnut Algorithms*
>
> Mobile : +33 627804399 <+33627804399>
>
> Email: tristan.marechaux@walnut-algo.com
>
> Web: www.walnutalgorithms.com
>

Mime
View raw message