flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Roberts <arobe...@fuze.com>
Subject Re: Parallelism and stateful mapping with Flink
Date Wed, 07 Dec 2016 16:41:30 GMT

(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays don’t
appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)

val env = StreamExecutionEnvironment.getExecutionEnvironment

  .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
  .map(new StatefulAggregator())

Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!


> On Dec 7, 2016, at 11:28 AM, Stefan Richter <s.richter@data-artisans.com> wrote:
> Hi,
> could you maybe provide the (minimal) code for the problematic job? Also, are you sure
that the keyBy is working on the correct key attribute?
> Best,
> Stefan
>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts <aroberts@fuze.com>:
>> Hello,
>> I’m trying to perform a stateful mapping of some objects coming in from Kafka in
a parallelized flink job (set on the job using env.setParallelism(3)). The data source is
a kafka topic, but the partitions aren’t meaningfully keyed for this operation (each kafka
message is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy()
operator directly before my map(), but I’m seeing objects with the same key distributed
to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
>> My understanding of keyBy is that it would segment the stream by key, and guarantee
that all data with a given key would hit the same instance. Am I possibly seeing residual
“keying” from the kafka topic?
>> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
>> Thanks,
>> Andrew

View raw message