beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: 答复: Creating side input map with global window
Date Fri, 23 Jun 2017 16:03:18 GMT
To unit test your function, have it accept a supplier with the default
supplier being the one that gives you a reference to the static instance
and another supplier for testing purposes.

On Fri, Jun 23, 2017 at 8:23 AM, Kevin Peterson <kevincp@google.com> wrote:

> Hey Lukasz,
>
> I tried using the setup function, but since this a streaming pipeline, the
> batches tend to be pretty small. I could force the pipeline to batch things
> up, but that feels like something that shouldn't be needed. I was already
> caching between elements within a thread, the problem was at pipeline
> start, or when a new instance was started, since each thread has its own
> cache.
>
> Using a static cache worked!
>
> private static final LoadingCache<BlobId, TypesCache> CACHE =
>         CacheBuilder.newBuilder()
>                     .refreshAfterWrite(30, TimeUnit.MINUTES)
>                     .build(new CacheLoader());
>
> This has gotten me unblocked, but isn't a perfect solution. Because the
> cache is static, I can't set any parameters of it, meaning that it is very
> hard to unit test because it is hard coded to access cloud storage instead
> of a local file.
>
> I tried using a Singleton to hold the cache and fetch it from the DoFn,
> but it seems like the Singleton isn't shared amongst all of the threads. I
> can see from the logs that all of the DoFn calls are on the same worker
> instance and different threads, I see a log statement from inside my
> synchronized block for each thread, which shouldn't be possible.
>
> Thoughts?
>
>
> On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik <lcwik@google.com> wrote:
>
>> Take a look at DoFn setup/teardown, called only once per DoFn instance
>> and not per element so it makes easier to write initialization code.
>>
>> Also if the schema map is shared, have you thought of using a single
>> static instance of Guava's LoadingCache shared amongst all the DoFn
>> instances?
>>
>> You can also refresh the data stored within the cache periodically.
>>
>> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <kevincp@google.com>
>> wrote:
>>
>>> Still gets stuck at the same place :/
>>>
>>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽)
<
>>> tangjijun@yhd.com> wrote:
>>>
>>>>
>>>>
>>>> .triggering(
>>>>         AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>>>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>>>
>>>>
>>>>
>>>> Try the trigger above
>>>>
>>>>
>>>>
>>>> *发件人:* Kevin Peterson [mailto:kevincp@google.com]
>>>> *发送时间:* 2017年6月15日 2:39
>>>> *收件人:* user@beam.apache.org
>>>> *主题:* Fwd: Creating side input map with global window
>>>>
>>>>
>>>>
>>>> Hi all,
>>>>
>>>>
>>>>
>>>> I am working on a (streaming) pipeline which reads elements from
>>>> Pubsub, and schemas for those elements from a separate pubsub topic. I'd
>>>> like to be able to create a side input map from the schema topic, and have
>>>> that available to the main pipeline for parsing. Each message on the schema
>>>> pubsub topic contains all schemas I care about, so for every new message,
I
>>>> want to generate a new map that will be available to the main pipeline
>>>> (eventual consistency is fine). I don't have any windows or triggers on the
>>>> main flow, since I really just want each element to be processed as it
>>>> arrives, using whatever the latest schema available is.
>>>>
>>>>
>>>>
>>>> I am currently trying this with:
>>>>
>>>>
>>>>
>>>> PCollection<KV<String, String>> schema = pipeline
>>>>         .apply("Read Schema",
>>>>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>>>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>>>                 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>>>         .apply("Create Schema", ParDo.*of*(new SchemaDirectory.GenerateSchema()));
 // outputs around 100 elements for each input
>>>>
>>>>
>>>>
>>>> PCollectionView<Map<String, String>> schemaView =
>>>>         schema.apply(View.<String, String>*asMap*());
>>>>
>>>> pipeline
>>>>         .apply("Read Elements", PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse
Elements",
>>>>
>>>>         ParDo.*of*(new DoFn<String, TableRow>() {
>>>>             @ProcessElement
>>>>             public void processElement(ProcessContext c) {
>>>>
>>>>                 String name = getNameFromElement(c.element());
>>>>
>>>>
>>>>                 String schema = c.sideInput(schemaView).get(name);
>>>>
>>>>
>>>>                 c.output(parse(c, schema));
>>>>
>>>>             }
>>>>         }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.*writeTableRows*())
// Other BQ options not copied.
>>>>
>>>> When running this pipeline, the View.AsMap/View.CreatePCol
>>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>>>> stage never emits any elements, and so the pipeline never progresses.
>>>> I can see the messages at the input stage, but nothing appears on the
>>>> output.
>>>>
>>>>
>>>>
>>>> Any advice?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> -Kevin
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message