flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ron Crocker <rcroc...@newrelic.com>
Subject Silly keyBy() error
Date Sat, 12 Mar 2016 20:01:49 GMT
I’m sure this should work, but I’m missing something… I searched the archive first, but
didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>)
cannot be used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward
reduce job, but I’m running into a snag with creating a KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME,
new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream
                .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type
(GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as
key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>,
namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice>
{
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws
Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice
interface:
public interface AggregatableTimeslice {
    int getAccountId();
    int getAgentId();
    long getWideMetricId();
    AggregatableTimesliceStats getTimesliceStats();
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcrocker@newrelic.com
M: +1 630 363 8835


Mime
View raw message