Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3AE3F19C4F for ; Sat, 12 Mar 2016 20:01:59 +0000 (UTC) Received: (qmail 1369 invoked by uid 500); 12 Mar 2016 20:01:58 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 1284 invoked by uid 500); 12 Mar 2016 20:01:58 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 1274 invoked by uid 99); 12 Mar 2016 20:01:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Mar 2016 20:01:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4D16FC662F for ; Sat, 12 Mar 2016 20:01:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.298 X-Spam-Level: * X-Spam-Status: No, score=1.298 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=newrelic-com.20150623.gappssmtp.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id xdH4ULUU5Xkz for ; Sat, 12 Mar 2016 20:01:56 +0000 (UTC) Received: from mail-pf0-f182.google.com (mail-pf0-f182.google.com [209.85.192.182]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 127275FAD3 for ; Sat, 12 Mar 2016 20:01:55 +0000 (UTC) Received: by mail-pf0-f182.google.com with SMTP id x3so13443700pfb.1 for ; Sat, 12 Mar 2016 12:01:54 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=newrelic-com.20150623.gappssmtp.com; s=20150623; h=from:subject:message-id:date:to:mime-version; bh=7i5fhMXJ4nxYJDXeEwfWNa2OTQXT+pHLXHHynrGPujo=; b=r/t3suK2yfC9z4Vn6MTY9UoYTMqfjvQxnquKuWeiv3GUwlCXpotScPwY65ByGdPSLz lO42dBzvAcfwp/F0JGrxGwTPotO6jGCV87sOylC4YJGTCMlQFuLvJe6KzDj8gz2c8wW5 sAvvsR553FTJpxSNmeBAG9b0fPIh+TjGCoAv2Ietmp3hMQGMsqGSpnO1Li614yStpNnH ayRhd54RMp+/Pmx08XI6ssSx8AXGmeWkmYSwj1vMcB9+EX/6gCU5bQI3cstAFQnYTuqS otqj96m1N2hhtyAn2ubll49GFS6azixaUaaDSpnvO1M9arlD/j/d0Lwq+afhSabnbcEN 2WEw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:subject:message-id:date:to:mime-version; bh=7i5fhMXJ4nxYJDXeEwfWNa2OTQXT+pHLXHHynrGPujo=; b=eY4x93+uN4HRuTxO9H+H6a+jqfw7vJy6S47ouTqzq86jmENEVUfbHUVyplNOWTHpeS Nm6UBg5yeTQxiObo+t0ICfI4qt0JVHEJcx4CiwMfVZ8I8FA4u6o4DEBP1m5jxshUj9NE v6kn8yTlidwwvfc897P7zZjOJ71jB9yhII0QgZodv/YPQhAR2jBtSvHX2+LWUNmNwAFD YRhp17iYFN+v4Qjv6QVd5YPIW6qIFWSTfdSlIz9JdDDFHTjBujXjYMvpW9AWTZBknjCz KEfe+Hm8eJH7pm9bKKKeeNMrMvwBjcKZr6NNN03HT5szWSS4SgfqBZhVQ+laJWt193g6 jfEA== X-Gm-Message-State: AD7BkJLLNzIkVCMrLt/eJu/iml0qPoU7MIfe0bxBqJhFpDgUDrDxjn9/7PHTvekE8ePOi4Jk X-Received: by 10.98.16.210 with SMTP id 79mr17842903pfq.69.1457812913744; Sat, 12 Mar 2016 12:01:53 -0800 (PST) Received: from [192.168.2.123] (static-50-43-30-38.bvtn.or.frontiernet.net. [50.43.30.38]) by smtp.gmail.com with ESMTPSA id 9sm21561695pfm.10.2016.03.12.12.01.52 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Sat, 12 Mar 2016 12:01:52 -0800 (PST) From: Ron Crocker Content-Type: multipart/alternative; boundary="Apple-Mail=_2223395D-F614-4CD9-A42A-1BF696970174" Subject: Silly keyBy() error Message-Id: <3C94A20D-C6BF-4913-82C2-E333E16E1483@newrelic.com> Date: Sat, 12 Mar 2016 12:01:49 -0800 To: user@flink.apache.org Mime-Version: 1.0 (Mac OS X Mail 8.2 \(2104\)) X-Mailer: Apple Mail (2.2104) --Apple-Mail=_2223395D-F614-4CD9-A42A-1BF696970174 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 I=E2=80=99m sure this should work, but I=E2=80=99m missing something=E2=80= =A6 I searched the archive first, but didn=E2=80=99t have much luck = finding any insights there. TL;DR: org.apache.flink.api.common.InvalidProgramException: This type = (GenericType) = cannot be used as key. I=E2=80=99m just getting started with a 1.0 implementation of a new = task. It=E2=80=99s a pretty straightforward reduce job, but I=E2=80=99m = running into a snag with creating a KeyedStream. Here=E2=80=99s the graph: StreamExecutionEnvironment see =3D = StreamExecutionEnvironment.getExecutionEnvironment(); see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream dataStream =3D see.addSource(new = FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TO= PIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties)); SingleOutputStreamOperator = flattenedDataStream =3D dataStream .assignTimestampsAndWatermarks(new = TimesliceTimestampExtractor()) .flatMap(new TimesliceMapper()); flattenedDataStream .keyBy("accountId", "agentId", "wideMetricId") .timeWindow(Time.seconds(60)) .reduce(AggregatableTimeslice::aggregateWith) .print(); This fails on keyBy() with the message:=20 Exception in thread "main" = org.apache.flink.api.common.InvalidProgramException: This type = (GenericType) = cannot be used as key. TimesliceMapper is a concrete implementation of = FlatMapFunction, namely public class TimesliceMapper implements FlatMapFunction { @Override public void flatMap(TimesliceData value, = Collector out) throws Exception { for (Timeslice timeslice : value.getTimeslices()) { out.collect(new AggregatableTimesliceImpl(timeslice, value, = value.getAgentId())); } } } AggregatableTimesliceImpl is a simple concrete implementation of the = AggregatableTimeslice interface: public interface AggregatableTimeslice { int getAccountId(); int getAgentId(); long getWideMetricId(); AggregatableTimesliceStats getTimesliceStats(); } Ron =E2=80=94 Ron Crocker Principal Engineer & Architect ( ( =E2=80=A2)) New Relic rcrocker@newrelic.com M: +1 630 363 8835 --Apple-Mail=_2223395D-F614-4CD9-A42A-1BF696970174 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8
I=E2=80=99m sure this should work, but I=E2=80=99= m missing something=E2=80=A6 I searched the archive first, but didn=E2=80=99= t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This = type = (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) = cannot be used as key.

I=E2=80=99m just getting started with a = 1.0 implementation of a new task. It=E2=80=99s a pretty straightforward = reduce job, but I=E2=80=99m running into a snag with creating a = KeyedStream.

Here=E2=80=99s the graph:
        StreamExecutionEnvironment see =3D = StreamExecutionEnvironment.getExecutionEnvironment();
        = see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    =     DataStream<TimesliceData> dataStream =3D = see.addSource(new = FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KA= FKA_TOPIC_NAME, new TimesliceDeserializer(), = kafkaConsumerProperties));

        = SingleOutputStreamOperator<AggregatableTimeslice> = flattenedDataStream =3D dataStream
    =             = .assignTimestampsAndWatermarks(new = TimesliceTimestampExtractor())
    =             .flatMap(new = TimesliceMapper());

        = flattenedDataStream
          =       .keyBy("accountId", "agentId", = "wideMetricId")
          =       .timeWindow(Time.seconds(60))
                = .reduce(AggregatableTimeslice::aggregateWith)
                = .print();

This fails on keyBy() with the message: 
Exception in thread "main" = org.apache.flink.api.common.InvalidProgramException: This type = (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) = cannot be used as key.
TimesliceMapper =
is a concrete =
implementation of FlatMapFunction<TimesliceData, =
AggregatableTimeslice>, =
namely
public class =
TimesliceMapper implements FlatMapFunction<TimesliceData, =
AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, =
Collector<AggregatableTimeslice> out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, =
value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface:
public interface =
AggregatableTimeslice {
int = getAccountId();
int = getAgentId();
long = getWideMetricId();
AggregatableTimesliceStats = getTimesliceStats();
}
Ron
=E2=80=94
Ron Crocker
Principal = Engineer & Architect
( ( =E2=80=A2)) New = Relic
M: +1 630 363 = 8835

= --Apple-Mail=_2223395D-F614-4CD9-A42A-1BF696970174--