flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ron Crocker <rcroc...@newrelic.com>
Subject Re: Silly keyBy() error
Date Sun, 13 Mar 2016 03:04:42 GMT
Thanks Stefano -

That helped, but just led to different pain. I think I need to reconsider how I treat these
things. Alas, the subject of a different thread.

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcrocker@newrelic.com
M: +1 630 363 8835

> On Mar 12, 2016, at 12:11 PM, Stefano Baghino <stefano.baghino@radicalbit.io> wrote:
> 
> Hi Ron,
> 
> not all classes can be used to `keyBy` a stream with. For your case in particular, it
looks like you have to implement Comparable so that Flink can correctly key your stream based
on AggregatableTimesliceImpl.
> 
> Take a look at the first slides here for more information on keying: http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html
<http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html>
> 
> Hope I helped.
> 
> On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <rcrocker@newrelic.com <mailto:rcrocker@newrelic.com>>
wrote:
> I’m sure this should work, but I’m missing something… I searched the archive first,
but didn’t have much luck finding any insights there.
> 
> TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>)
cannot be used as key.
> 
> I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward
reduce job, but I’m running into a snag with creating a KeyedStream.
> 
> Here’s the graph:
>         StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>         see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>         DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
new TimesliceDeserializer(), kafkaConsumerProperties));
> 
>         SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream =
dataStream
>                 .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
>                 .flatMap(new TimesliceMapper());
> 
>         flattenedDataStream
>                 .keyBy("accountId", "agentId", "wideMetricId")
>                 .timeWindow(Time.seconds(60))
>                 .reduce(AggregatableTimeslice::aggregateWith)
>                 .print();
> 
> This fails on keyBy() with the message: 
> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This
type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used
as key.
> TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>,
namely
> public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice>
{
>     @Override
>     public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out)
throws Exception {
>         for (Timeslice timeslice : value.getTimeslices()) {
>             out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
>         }
>     }
> }
> AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice
interface:
> public interface AggregatableTimeslice {
>     int getAccountId();
>     int getAgentId();
>     long getWideMetricId();
>     AggregatableTimesliceStats getTimesliceStats();
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcrocker@newrelic.com <mailto:rcrocker@newrelic.com>
> M: +1 630 363 8835 <tel:%2B1%20630%20363%208835>
> 
> 
> 
> -- 
> BR,
> Stefano Baghino
> 
> Software Engineer @ Radicalbit


Mime
View raw message