Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 76E861809D for ; Mon, 30 Nov 2015 14:25:46 +0000 (UTC) Received: (qmail 26601 invoked by uid 500); 30 Nov 2015 14:25:46 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 26476 invoked by uid 500); 30 Nov 2015 14:25:46 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 26461 invoked by uid 99); 30 Nov 2015 14:25:46 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Nov 2015 14:25:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 65EFCC569E for ; Mon, 30 Nov 2015 13:47:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.981 X-Spam-Level: ** X-Spam-Status: No, score=2.981 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=basj-es.20150623.gappssmtp.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ky4DN8LNxZ_4 for ; Mon, 30 Nov 2015 13:47:22 +0000 (UTC) Received: from mail-wm0-f53.google.com (mail-wm0-f53.google.com [74.125.82.53]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 992DF20FF5 for ; Mon, 30 Nov 2015 13:47:22 +0000 (UTC) Received: by wmvv187 with SMTP id v187so157477420wmv.1 for ; Mon, 30 Nov 2015 05:47:21 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=basj-es.20150623.gappssmtp.com; s=20150623; h=mime-version:sender:date:message-id:subject:from:to:content-type; bh=grb09zdPCN/jptzerqzOVusrGwk33pFIl4GEBQqSKP4=; b=PagcGMP/2nkVUs//S11bICgNkj257ZjjqK79Bk3dfPnDjMrL4Rl0PXFKlV1qsossuS 7kVvu9rPjaQK+dG84nZ4jMSdHbS32ASQMHi6+m5gDONXI32bbpQKxZaz3oflWknNgtBn anT0bjdcIgLKxMzMamUT4HhHXz/QlmDlrn4LlcX0XyNcGL+3QGGSjDV9MFmjrUenxBdq hQ2Wswiiv+UXkSaR+wtWZbdyCTdH2SM2ZLqd6ad1+5fOGpLUjUdFInzRKt5YJdWqXEn7 /0B4FxdwwCSJzb5x2RNKuJ+XqvU7BuZL4pbFv7uKUDlAvP/bZNl9Fjm6Ab35mzkQkX+G SWhQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:sender:date:message-id:subject:from :to:content-type; bh=grb09zdPCN/jptzerqzOVusrGwk33pFIl4GEBQqSKP4=; b=eLHm9Oydq518ZI1rc9iD4PStIPaIGJBDC/xQ0Er8qyT5fNbdf0jdeRMQX5ePzCAYJG eKEJ61mrkBfgauRzAPQioTbrWNWsjAA7Pie9MBMWK8CLuaGw3nAb1sJB2ouG4dwNWcrH R7fiBmfYrjVla2guVO+2ZxHwjU1qLr5M/P4wpyJFWVuOj8sic6QkBpaylxFNF7flYAqE LUcWUWNaGbP/rhFacUz64VXp1hF/T3sJw4D2hLbzcWNv/4XoYHaCCgA1rfZF6VvSQnvP X18Yp34G4ldS1qo1a6qHsg9Y83ivpbl1Lk5cuVW/hFSuRTnDTyTURcwgV5jsN3WIvJiZ SVZA== X-Gm-Message-State: ALoCoQlnwQ+CgFJqIr48FZh6zNUnscU5l2/qWTftIxZfFMidWKp9aK8ZB9eXfvur3Roy6ZeEAufM MIME-Version: 1.0 X-Received: by 10.28.73.11 with SMTP id w11mr26848807wma.44.1448891241127; Mon, 30 Nov 2015 05:47:21 -0800 (PST) Sender: niels@basj.es Received: by 10.28.54.141 with HTTP; Mon, 30 Nov 2015 05:47:21 -0800 (PST) X-Originating-IP: [91.195.1.33] Date: Mon, 30 Nov 2015 14:47:21 +0100 X-Google-Sender-Auth: XO58gf-wOF7NFR7-MIeX21Jm8DY Message-ID: Subject: Triggering events From: Niels Basjes To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114b2d40beeae90525c247b9 --001a114b2d40beeae90525c247b9 Content-Type: text/plain; charset=UTF-8 Hi, I'm experimenting with a custom Windowing setup over clickstream data. I want the timestamps of this clickstream data to be the timestamps 'when the event occurred' and in the Windows I need to trigger on these times. For testing I created a source roughly like this: public class ManualTimeEventSource extends RichEventTimeSourceFunction { ctx.collectWithTimestamp(event, event.timestamp); But none of the triggers were called so I started digging through the code. Then I figured I apparently needed to add the watermarks myself, so I added a line: ctx.emitWatermark(new Watermark(event.timestamp)); But now I get: *Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord* * at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)* * at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)* * at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)* * at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)* * at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)* * at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)* * ... 9 more* This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or in my code? What is the right way to trigger the events in my Windowing setup? P.S. I'm binding my Java application against Flink version 0.10.1 -- Best regards / Met vriendelijke groeten, Niels Basjes --001a114b2d40beeae90525c247b9 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

I'm experimenting with a custom= Windowing setup over clickstream data.
I want the timestamps of = this clickstream data to be the timestamps 'when the event occurred'= ; and in the Windows I need to trigger on these times.

=
For testing I created a source roughly like this:
=
    public class ManualTimeEventSource =
extends RichEventTimeSourceFunction<Long> {
   =
                 ctx.collectWithTimestamp(event,=C2=A0event.timestamp);

But none of the triggers were called so I st=
arted digging through the code.
Then I figured I apparently =
needed to add the watermarks myself, so I added a line:
                    ctx.emitWatermark(new Watermark(event.timestamp));

But now I get:

=
Caused by: java.lang.ClassCastExcept= ion: org.apache.flink.streaming.api.watermark.Watermark cannot be ca= st to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.Stream= RecordSerializer.serialize(StreamRecordSerializer.java:41)
<= /div>
a= t org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati= onDelegate.java:56)
at org.apache.flink.runtime.io.network.a= pi.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerialize= r.java:79)
at org.apache.flink.runtime.io.network.api.writer= .RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.ap= ache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamReco= rdWriter.java:93)
at org.apache.flink.streaming.runtime.io.R= ecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
=
= ... 9 more

This = seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flin= k or in my code?

What is the right way to trigger = the events in my Windowing setup?



P.S. I'm binding my Java application against Flink ve= rsion 0.10.1

--
= Best regards / Met vriendelijke groeten,

Niels Basjes
--001a114b2d40beeae90525c247b9--