flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Parallelism and stateful mapping with Flink
Date Thu, 08 Dec 2016 15:23:35 GMT
@Aljoscha: I remember that someone else ran into this, too. Should we address arrays as keys
specifically in the API? Prohibit? Document this?

– Ufuk

On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
> Sure!
>  
> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays
don’t  
> appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)
>  
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(Config.callAggregator.parallelism)
>  
> env.addSource(kafkaSource)
> .flatMap(transformToRecords(_))
> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> .map(new StatefulAggregator())
> .addSink(hbaseSink)
>  
>  
> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
>  
> -a
>  
>  
>  
> > On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:  
> >
> > Hi,
> >
> > could you maybe provide the (minimal) code for the problematic job? Also, are you
sure  
> that the keyBy is working on the correct key attribute?
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>
> >> Hello,
> >>
> >> I’m trying to perform a stateful mapping of some objects coming in from Kafka
in a parallelized  
> flink job (set on the job using env.setParallelism(3)). The data source is a kafka topic,
 
> but the partitions aren’t meaningfully keyed for this operation (each kafka message
 
> is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy()
 
> operator directly before my map(), but I’m seeing objects with the same key distributed
 
> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
 
> >>
> >> My understanding of keyBy is that it would segment the stream by key, and guarantee
 
> that all data with a given key would hit the same instance. Am I possibly seeing residual
 
> “keying” from the kafka topic?
> >>
> >> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
> >>
> >> Thanks,
> >>
> >> Andrew
> >
>  
>  


Mime
View raw message