beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amir bahmanyari <amirto...@yahoo.com>
Subject Re: Any data gets cached?
Date Tue, 02 Aug 2016 18:25:01 GMT
Hi Raghu,Any opinion on this pls? I appreciate your time...Cheers

      From: amir bahmanyari <amirtousa@yahoo.com>
 To: "user@beam.incubator.apache.org" <user@beam.incubator.apache.org> 
 Sent: Sunday, July 31, 2016 3:05 PM
 Subject: Re: Any data gets cached?
   
Hi Raghu,Thanks so much for your response. Following is how I am reading unbounded records
from Kafka through KafkaIO() & processing them in its corresponding inner class.How do
I include "aggregation" in this call?have a great weekend.
Pipeline p = Pipeline.create(options);
............................etc...........................try { PCollection<KV<String,
String>> kafkarecords = p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
.withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply(ParDo.named("startBundle").of(
new DoFn<KV<byte[], String>, KV<String, String>>() {...................etc......................
                                            @Override public void processElement(ProcessContext
ctx) throws Exception {............................etc......................................
      From: Raghu Angadi <rangadi@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com> 
 Sent: Saturday, July 30, 2016 2:15 PM
 Subject: Re: Any data gets cached?
  
On the surface it looks like you are asking about basic aggregations. These are of course
provided by Beam too. Almost all Beam examples make use of these. See 'Count.<string>PerElement()'
in WordCound.java example. If not either post your Beam code or roughly equivalent SQL here.
On Fri, Jul 29, 2016 at 4:26 PM, amir bahmanyari <amirtousa@yahoo.com> wrote:

Hi Raghu,Is this the right assumption that if results are not aggregated we may have inconsistency
in what the final result may look like?What would be the best aggregation approach to guarantee
consistency? Even if there is perf. cost.Thanks

      From: Raghu Angadi <rangadi@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com> 
 Sent: Friday, July 29, 2016 2:32 PM
 Subject: Re: Any data gets cached?
  
I suggest you try aggregating using Beam primitives (GroupByKey, count etc), see if it produced
in consistent results.
On Fri, Jul 29, 2016 at 12:09 PM, amir bahmanyari <amirtousa@yahoo.com> wrote:

Sorry colleagues,I am having a moving target in my Beam app with FlinkRunner in a Flink Cluster
of 2 nodes.Every run produces a different result while we know what the result MUST be: its
an expected fixed number.I checked and see Kafka is NOT sending any extra records.My first
suspect was Redis thread-UNsafe hashmap objects.I replaced them with Java ConcurrentHashMaps.
Wow! it worked for the very first time every PERFECT.I then re-run exact same thing expecting
the exact same previous result.But its different. Run it again. Another different wrong result.
A different result each time.No code change nothing different.
I am wondering if some previous data/record gets cached by Beam/FlinkRunner/KafkaIO invocation
etc. somewhere.Sorry for the long email. Am losing my mind catching this moving target :))Appreciate
your kind feedback on this.Cheers+have a great weekend.Amir-



   



   

  
Mime
View raw message