Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B5AA200B64 for ; Tue, 2 Aug 2016 14:54:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 19E91160A8C; Tue, 2 Aug 2016 12:54:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 602DC160A76 for ; Tue, 2 Aug 2016 14:54:34 +0200 (CEST) Received: (qmail 59384 invoked by uid 500); 2 Aug 2016 12:54:33 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 59374 invoked by uid 99); 2 Aug 2016 12:54:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2016 12:54:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DA6F9C0B4A for ; Tue, 2 Aug 2016 12:54:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.313 X-Spam-Level: * X-Spam-Status: No, score=1.313 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_NONE=-0.0001, URI_HEX=1.313] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id zp47Hhv2TbV4 for ; Tue, 2 Aug 2016 12:54:30 +0000 (UTC) Received: from mwork.nabble.com (mwork.nabble.com [162.253.133.43]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 88BBA60DC5 for ; Tue, 2 Aug 2016 12:54:30 +0000 (UTC) Received: from mjoe.nabble.com (unknown [162.253.133.57]) by mwork.nabble.com (Postfix) with ESMTP id CF49730698C8C for ; Tue, 2 Aug 2016 05:54:29 -0700 (MST) Date: Tue, 2 Aug 2016 05:09:36 -0700 (PDT) From: "David B. Ciar" To: user@flink.apache.org Message-ID: <1470139776457-8259.post@n4.nabble.com> In-Reply-To: References: Subject: Re: Window Functions with Incremental Aggregation MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit archived-at: Tue, 02 Aug 2016 12:54:35 -0000 Hello again, Having had another go at this today, I clearly see that I cannot pass a certain type into the fold/window function and expect to be able to return a datastream of another type from the window function. I have tried a different approach and am now receiving a run-time exception, caused by trying to use a composite case class as the fold accumulator value. My query now is whether this is possible, and if it is possible, how to fix the run-time exception. Again any help is appreciated. The exception: Exception in thread "main" java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.flink.api.common.typeinfo.TypeInformation; at org.management.observations.processing.jobs.QCBlockNull$$anon$6.(QCBlockNull.scala:104) at org.management.observations.processing.jobs.QCBlockNull$.main(QCBlockNull.scala:104) at org.management.observations.processing.jobs.QCBlockNull.main(QCBlockNull.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) The code snippet is pasted below, but also neater formatted Gist link: // The cause of the exception is the .apply(...) below and the use of IncrementalPlaceHolder. The fold and window classes return type IncrementalWindowPlaceholder val nullQCEvents1h = nullStream .keyBy("feature","procedure") .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(30))) .apply(new IncrementalWindowPlaceholder(0,None,None,None), new QCFoldCounter(), new QCCheckNullAggregate()) // The aggregate class I want to use with the fold/window function and emit as the DataStream type: case class IncrementalWindowPlaceholder (foldedValue: Double, keys: Option[Tuple], startTime: Option[Long], endTime: Option[Long]){ override def toString: String = foldedValue.toString+','+keys.getOrElse('-')+','+startTime.getOrElse('-')+','+endTime.getOrElse('-') } Also here: https://gist.github.com/dbciar/904e2d35d6aae30214666de1176f1d7c -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Functions-with-Incremental-Aggregation-tp8246p8259.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.