beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: 答复: Creating side input map with global window
Date Thu, 15 Jun 2017 13:26:27 GMT
Take a look at DoFn setup/teardown, called only once per DoFn instance and
not per element so it makes easier to write initialization code.

Also if the schema map is shared, have you thought of using a single static
instance of Guava's LoadingCache shared amongst all the DoFn instances?

You can also refresh the data stored within the cache periodically.

On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <kevincp@google.com> wrote:

> Still gets stuck at the same place :/
>
> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽)
<
> tangjijun@yhd.com> wrote:
>
>>
>>
>> .triggering(
>>         AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>         .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>
>>
>>
>> Try the trigger above
>>
>>
>>
>> *发件人:* Kevin Peterson [mailto:kevincp@google.com]
>> *发送时间:* 2017年6月15日 2:39
>> *收件人:* user@beam.apache.org
>> *主题:* Fwd: Creating side input map with global window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>> and schemas for those elements from a separate pubsub topic. I'd like to be
>> able to create a side input map from the schema topic, and have that
>> available to the main pipeline for parsing. Each message on the schema
>> pubsub topic contains all schemas I care about, so for every new message, I
>> want to generate a new map that will be available to the main pipeline
>> (eventual consistency is fine). I don't have any windows or triggers on the
>> main flow, since I really just want each element to be processed as it
>> arrives, using whatever the latest schema available is.
>>
>>
>>
>> I am currently trying this with:
>>
>>
>>
>> PCollection<KV<String, String>> schema = pipeline
>>         .apply("Read Schema",
>>                 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>         .apply(Window.<String>*into*(new GlobalWindows()).triggering(
>>                 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>         .apply("Create Schema", ParDo.*of*(new SchemaDirectory.GenerateSchema()));
 // outputs around 100 elements for each input
>>
>>
>>
>> PCollectionView<Map<String, String>> schemaView =
>>         schema.apply(View.<String, String>*asMap*());
>>
>> pipeline
>>         .apply("Read Elements", PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse
Elements",
>>
>>         ParDo.*of*(new DoFn<String, TableRow>() {
>>             @ProcessElement
>>             public void processElement(ProcessContext c) {
>>
>>                 String name = getNameFromElement(c.element());
>>
>>
>>                 String schema = c.sideInput(schemaView).get(name);
>>
>>
>>                 c.output(parse(c, schema));
>>
>>             }
>>         }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.*writeTableRows*())
// Other BQ options not copied.
>>
>> When running this pipeline, the View.AsMap/View.CreatePCol
>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>> stage never emits any elements, and so the pipeline never progresses. I
>> can see the messages at the input stage, but nothing appears on the output.
>>
>>
>>
>> Any advice?
>>
>>
>>
>> Thanks,
>>
>> -Kevin
>>
>>
>>
>
>

Mime
View raw message