beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vilhelm von Ehrenheim <vonehrenh...@gmail.com>
Subject Re: Global sum of latest help
Date Tue, 05 Dec 2017 20:58:31 GMT
That made all the sense and solved my problem! Thank you so much!

On Tue, Dec 5, 2017 at 9:05 PM, Kenneth Knowles <klk@google.com> wrote:

> I think the problem with your coder is that you specified that the
> accumulator type is a HashMap, more specific than just Map. Beam's coder
> inference won't select the MapCoder (which only guarantees you get a Map
> back, not a HashMap) and falling back to SerializableCoder which is "all or
> nothing" and doesn't look at coders registered for any type parameters. If
> you change it to a Map<String, Funding> then you should see MapCoder
> selected, and it will recursively choose AvroCoder for your types.
>
> On Tue, Dec 5, 2017 at 11:55 AM, Vilhelm von Ehrenheim <
> vonehrenheim@gmail.com> wrote:
>
>> The error got a bit strange there.
>>
>> Here it is w line breaks:
>>
>> (6e1443def795dcc9): java.lang.RuntimeException: Unable to persist state
>> com.google.cloud.dataflow.worker.WindmillStateInternals.pers
>> ist(WindmillStateInternals.java:218) com.google.cloud.dataflow.work
>> er.StreamingModeExecutionContext$StepContext.flushState(Stre
>> amingModeExecutionContext.java:513) com.google.cloud.dataflow.work
>> er.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.pro
>> cess(StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.work
>> er.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.
>> run(StreamingDataflowWorker.java:841) java.util.concurrent.ThreadPoo
>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745) Caused by:
>> org.apache.beam.sdk.coders.CoderException: unable to serialize record
>> {8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=Entity(
>> id=8655fe63-b7b8-2835-4559-ea2cb763ad62, sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]},
>> updatedAt=1504856143000, version=1), org=othera, raisedAmount=null,
>> raisedAmountUsd=null, currency=null, series=null, announcedOn=null,
>> type=null, investors=[])} org.apache.beam.sdk.coders.Ser
>> ializableCoder.encode(SerializableCoder.java:127)
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
>> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind
>> millBag.persistDirectly(WindmillStateInternals.java:575)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$Simp
>> leWindmillState.persist(WindmillStateInternals.java:320)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind
>> millCombiningState.persist(WindmillStateInternals.java:952)
>> com.google.cloud.dataflow.worker.WindmillStateInternals.pers
>> ist(WindmillStateInternals.java:216) com.google.cloud.dataflow.work
>> er.StreamingModeExecutionContext$StepContext.flushState(Stre
>> amingModeExecutionContext.java:513) com.google.cloud.dataflow.work
>> er.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.pro
>> cess(StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.work
>> er.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.
>> run(StreamingDataflowWorker.java:841) java.util.concurrent.ThreadPoo
>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745) Caused by: java.io
>> .NotSerializableException: co.motherbrain.cyrano.model.Funding
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>> java.util.HashMap.writeObject(HashMap.java:1362)
>> sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:498)
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124)
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
>> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind
>> millBag.persistDirectly(WindmillStateInternals.java:575)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$Simp
>> leWindmillState.persist(WindmillStateInternals.java:320)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind
>> millCombiningState.persist(WindmillStateInternals.java:952)
>> com.google.cloud.dataflow.worker.WindmillStateInternals.pers
>> ist(WindmillStateInternals.java:216) com.google.cloud.dataflow.work
>> er.StreamingModeExecutionContext$StepContext.flushState(Stre
>> amingModeExecutionContext.java:513) com.google.cloud.dataflow.work
>> er.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.pro
>> cess(StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.work
>> er.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.
>> run(StreamingDataflowWorker.java:841) java.util.concurrent.ThreadPoo
>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> On Tue, Dec 5, 2017 at 8:52 PM, Vilhelm von Ehrenheim <
>> vonehrenheim@gmail.com> wrote:
>>
>>> No the order is not so important as long as it is correct and doesnt
>>> emit sums for late values.
>>>
>>> {"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3}
>>> {"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1}
>>> {"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2}
>>>
>>> Would produce 3, 4 then 5
>>>
>>> {"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2}
>>> {"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3}
>>> {"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1}
>>>
>>> would produce only 2 and 5 (value 1 is excluded as it is too late
>>> compared to value 2).
>>>
>>> After your tips I wrote up a custom CombineFn that does this by saving
>>> the latest records and computing the result as it extracts the output. The
>>> data examples I sent were a bit simplified but the result is the similar.
>>> The Funding class just has a few more fields. It is also used successfully
>>> in a lot of places.
>>>
>>> Example Funding object:
>>>
>>> Funding(id=2, updatedAt=1491868800000, version=2, org=the-empire, raisedAmountUsd=2,
announcedOn=1292284800000, type="A")
>>>
>>> Here is the CombineFn:
>>>
>>> public class SumLatestFundingFn extends Combine.CombineFn<Funding, HashMap<String,Funding>,
SumLatestFundingFn.Result>{
>>>     @Data
>>>     @DefaultCoder(AvroCoder.class)
>>>     public static class Result {
>>>         Long totalFunding;
>>>         Funding latestFunding;
>>>
>>>         public Result() {}
>>>         public Result(Long totalFunding, Funding latestFunding) {
>>>             this.totalFunding = totalFunding;
>>>             this.latestFunding = latestFunding;
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public HashMap<String, Funding> createAccumulator() { return new HashMap<>();
}
>>>
>>>     @Override
>>>     public HashMap<String,Funding> addInput(HashMap<String,Funding>
accum, Funding input) {
>>>         if (!accum.containsKey(input.getId()) ||
>>>                 input.getVersion() > accum.get(input.getId()).getVersion())
{
>>>             accum.put(input.getId(), input);
>>>         }
>>>         return accum;
>>>     }
>>>
>>>     @Override
>>>     public HashMap<String,Funding> mergeAccumulators(Iterable<HashMap<String,Funding>>
accums) {
>>>         HashMap<String,Funding> merged = createAccumulator();
>>>         for (HashMap<String,Funding> accum : accums) {
>>>             for (Funding funding : accum.values()) {
>>>                 merged = addInput(merged, funding);
>>>             }
>>>         }
>>>         return merged;
>>>     }
>>>
>>>     @Override
>>>     public Result extractOutput(HashMap<String,Funding> accum) {
>>>         Long totalFunding = accum.values().stream()
>>>                 .mapToLong(funding -> firstNonNull(funding.getRaisedAmountUsd(),
0L)).sum();
>>>
>>>         Funding latestFunding = accum.values().stream()
>>>                 .max((first, second) ->
>>>                         (int) (firstNonNull(first.getAnnouncedOn(), Long.MIN_VALUE)
-
>>>                                 firstNonNull(second.getAnnouncedOn(), Long.MIN_VALUE)))
>>>                 .orElse(new Funding());
>>>
>>>         return new Result(totalFunding, latestFunding);
>>>     }
>>> }
>>>
>>> I’m using Lombok annotations to generate getters, setters, equals and
>>> hashcode. This works in a lot of pipelines I have already.
>>>
>>> This works great when testing it with teststream but I get a nasy error
>>> in dataflow when I use a Repeatedly.forever(AfterPane.elementCountAtLeast(1))
>>> trigger. I tried w a less eager trigger but with the same error. If I
>>> remove Repeatedly.forever() the pipeline works but gives me incorrect
>>> results as the trigger only fire once.
>>>
>>> Here is the error:
>>>
>>> (6e1443def795dcc9): java.lang.RuntimeException: Unable to persist state com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:218)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745) Caused by: org.apache.beam.sdk.coders.CoderException:
unable to serialize record {8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=Entity(id=8655fe63-b7b8-2835-4559-ea2cb763ad62,
sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]}, updatedAt=1504856143000, version=1),
org=othera, raisedAmount=null, raisedAmountUsd=null, currency=null, series=null, announcedOn=null,
type=null, investors=[])} org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:127)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47) org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: co.motherbrain.cyrano.model.Funding
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
java.util.HashMap.internalWriteEntries(HashMap.java:1785) java.util.HashMap.writeObject(HashMap.java:1362)
sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498) java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124) org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
>>>
>>> What I find very strange is that the error is from the
>>> SerializableCoder. I have specified DefaultCoder(AvroCoder.class) on all my
>>> classes (including Funding).
>>>
>>> Do you think this is a bug or am I missing something? Really strange
>>> that the tests work and that it is fine as long as I do not use
>>> Repeatedly.forever.
>>>
>>> Really thankful for your help!
>>>
>>> // Vilhelm
>>>
>>> On 5 Dec 2017 02:00, “Lukasz Cwik” <lcwik@google.com> wrote:
>>>
>>> I believe you can provide ordering if you decide to put any unconsumed
>>>> records into state. Every time you read state and check to see if its the
>>>> next corresponding id. If so then emit the new sum otherwise push it back
>>>> onto state until you get the missing ids allowing you to backfill all the
>>>> prior values that should have been emitted.
>>>>
>>>> On Mon, Dec 4, 2017 at 4:26 PM, Kenneth Knowles <klk@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 4, 2017 at 3:22 PM, Lukasz Cwik <lcwik@google.com>
wrote:
>>>>>
>>>>>> Since processing can happen out of order, for example if the input
>>>>>> was:
>>>>>> ```
>>>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>>>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>>>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>>>>>> ```
>>>>>> would the output be 3 and then 5 or would you still want 1, 4, and
>>>>>> then 5?
>>>>>>
>>>>>
>>>>> My own guess here would be 2, 3, then 5.
>>>>>
>>>>> You won't be able to do this with a sequence of summations, but you
>>>>> could Combine.perKey() where the per-"parent_id" accumulator tracks the
>>>>> latest value and timestamp for each "id". The trouble is going to be
in the
>>>>> global window if you have either an unbounded domain for "id" or
>>>>> "parent_id" you won't be able to collect any expired state. You can
>>>>> accomplish the same with a stateful ParDo using a MapState, and gain
tight
>>>>> control over when to output. But you have the same question to answer
- how
>>>>> do you decide when a value is safe to forget about? (or safe to merge
into
>>>>> a global bucket because it won't be overwritten any more)
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>
>>>>>> On Mon, Dec 4, 2017 at 2:13 PM, Vilhelm von Ehrenheim <
>>>>>> vonehrenheim@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all!
>>>>>>> First of all great work on the 2.2.0 release! really excited
to
>>>>>>> start using it.
>>>>>>>
>>>>>>> I have a problem with how I should construct a pipeline that
should
>>>>>>> emit a sum of latest values which I hope someone might have some
ideas on
>>>>>>> how to solve.
>>>>>>>
>>>>>>> Here is what I have:
>>>>>>>
>>>>>>> I have a stateful stream of events that contain updates to a
long
>>>>>>> amonst other things. These events looks something like this
>>>>>>>
>>>>>>> ```
>>>>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>>>>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>>>>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>>>>>>> ```
>>>>>>>
>>>>>>> I want to emit sums of the `amount` per `parent_id` but only
using
>>>>>>> the latest record per `id`. Here that would result in sums of
1, 4 and then
>>>>>>> 5.
>>>>>>>
>>>>>>> To make it harder I need to do this in a global window with
>>>>>>> triggering based on element count. I could maybe combine that
w a
>>>>>>> processing time trigger though. At least I need a global sum
over all
>>>>>>> events.
>>>>>>>
>>>>>>> I have tried to do this with Latest.perKey and Sum.perKey but
as you
>>>>>>> probably realize that will give some strange results as the downstream
sum
>>>>>>> will not discard elements that are replaced by newer updates
in the latest
>>>>>>> transform.
>>>>>>>
>>>>>>> I also though I could write a custom CombineFn for this but I
need
>>>>>>> to do it for different keys which leaves me really confused.
>>>>>>>
>>>>>>> Any help or pointers are greatly appreciated.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Vilhelm von Ehrenheim
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>> ​
>>>
>>
>>
>

Mime
View raw message