I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream
                .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835