flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Parallelism and stateful mapping with Flink
Date Fri, 09 Dec 2016 04:05:59 GMT
I commented on the issue with a way that should work.

On Fri, 9 Dec 2016 at 01:00 Chesnay Schepler <chesnay@apache.org> wrote:

> Done. https://issues.apache.org/jira/browse/FLINK-5299
>
> On 08.12.2016 16:50, Ufuk Celebi wrote:
> > Would you like to open an issue for this for starters Chesnay? Would be
> good to fix for the upcoming release even.
> >
> >
> > On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org)
> wrote:
> >> It would be neat if we could support arrays as keys directly; it should
> >> boil down to checking the key type and in case of an array injecting a
> >> KeySelector that calls Arrays.hashCode(array).
> >> This worked for me when i ran into the same issue while experimenting
> >> with some stuff.
> >>
> >> The batch API can use arrays as keys as well, so it's also a matter of
> >> consistency imo.
> >>
> >> Regards,
> >> Chesnay
> >>
> >> On 08.12.2016 16:23, Ufuk Celebi wrote:
> >>> @Aljoscha: I remember that someone else ran into this, too. Should we
> address arrays
> >> as keys specifically in the API? Prohibit? Document this?
> >>> – Ufuk
> >>>
> >>> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com)
> wrote:
> >>>> Sure!
> >>>>
> >>>> (Aside, it turns out that the issue was using an `Array[Byte]` as a
> key - byte arrays
> >> don’t
> >>>> appear to have a stable hashCode. I’ll provide the skeleton for
> fullness, though.)
> >>>>
> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>> env.setParallelism(Config.callAggregator.parallelism)
> >>>>
> >>>> env.addSource(kafkaSource)
> >>>> .flatMap(transformToRecords(_))
> >>>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> >>>> .map(new StatefulAggregator())
> >>>> .addSink(hbaseSink)
> >>>>
> >>>>
> >>>> Again, wrapping my keyBy function in `new String()` has fixed my
> issue. Thanks!
> >>>>
> >>>> -a
> >>>>
> >>>>
> >>>>
> >>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> could you maybe provide the (minimal) code for the problematic job?
> Also, are you
> >> sure
> >>>> that the keyBy is working on the correct key attribute?
> >>>>> Best,
> >>>>> Stefan
> >>>>>
> >>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>>>>>
> >>>>>> Hello,
> >>>>>>
> >>>>>> I’m trying to perform a stateful mapping of some objects coming
in
> from Kafka in a
> >> parallelized
> >>>> flink job (set on the job using env.setParallelism(3)). The data
> source is a kafka
> >> topic,
> >>>> but the partitions aren’t meaningfully keyed for this operation (each
> kafka message
> >>>> is flatMapped to between 0-2 objects, with potentially different
> keys). I have a keyBy()
> >>>> operator directly before my map(), but I’m seeing objects with the
> same key distributed
> >>>> to different parallel task instances, as reported by
> getRuntimeContext().getIndexOfThisSubtask().
> >>>>>> My understanding of keyBy is that it would segment the stream
by
> key, and guarantee
> >>>> that all data with a given key would hit the same instance. Am I
> possibly seeing residual
> >>>> “keying” from the kafka topic?
> >>>>>> I’m running flink 1.1.3 in scala. Please let me know if I
can add
> more info.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Andrew
> >>>>
> >>
> >>
> >
>
>

Mime
View raw message