Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6CC08200C70 for ; Thu, 4 May 2017 22:01:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6B54B160BB0; Thu, 4 May 2017 20:01:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E2283160B9B for ; Thu, 4 May 2017 22:01:36 +0200 (CEST) Received: (qmail 85091 invoked by uid 500); 4 May 2017 20:01:36 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 85081 invoked by uid 99); 4 May 2017 20:01:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 May 2017 20:01:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 876C21AA2B9 for ; Thu, 4 May 2017 20:01:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 5.13 X-Spam-Level: ***** X-Spam-Status: No, score=5.13 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, KAM_BADIPHTTP=2, KAM_INFOUSMEBIZ=0.75, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G_fnUvInyyHG for ; Thu, 4 May 2017 20:01:32 +0000 (UTC) Received: from mail-qk0-f180.google.com (mail-qk0-f180.google.com [209.85.220.180]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 846DD5FB32 for ; Thu, 4 May 2017 20:01:32 +0000 (UTC) Received: by mail-qk0-f180.google.com with SMTP id n4so20390301qkc.0 for ; Thu, 04 May 2017 13:01:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=TEFKO6V7dpCa/3fyUr2I8/CbM0VmeDF4FV6gcxlnChc=; b=gzsdRmoxLloXenmNBbQaIe4BUL16QUWBazNh+0mCP1er1ZdxDSp2VDMQmyVmH8VrQn JwzJQ4T6pCtwRRxZHgbgy9/Sokx5oWiSAiQdwZqInKC1/2+dScXj14YcaYXkUvO621Fz Po7pdldv8yK5vrIEzF3mr7h8jFeEMEEHScXtp0RJflIAqa5IUCeFd0BJEAsHVi6rw+9B IZOY0CAN7lZeChBDPjM2J3EcXeGYOSwndy8N4Zbwp2B0/foKXY2s+lrb6R1faW7Ntq9i /G6RprN7he8PK06bNlBrYDrnMm+xhO8+S9NByd+uma7ENdqaNlI3katWpceQqb0VNK7w 2U+Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=TEFKO6V7dpCa/3fyUr2I8/CbM0VmeDF4FV6gcxlnChc=; b=m4iGQGUW6wyLyr/BkOF2HzcSGk/S9+AvLmIDfhAIqWqzRgwZATUB/um9g4VUpE0YL5 h2q6OuNApIUl769j5R+G4XuPByFsKXn3K7bG1ceMZYMtj2HP8EOxJpBUBJVnVMcFMP91 PEQU1lzS2PQ36K0mykIt07wMtzdhj+UL2Cssw9NzdXNp9qD3vy2QyVkOicJzv1USeGue RpjA3/LXM/6nLql7E/dVFHGf26dBTiPDRdLTE8uJjXioix4MjuD9xFZLebgVuqqwQL1M o1/gD6DmbOqJEALXsNh7Q2D4ZFtAeyI6pzoelJ9juz1YgZnc8hlvOGZDsnM/z82Pe4LC 11DQ== X-Gm-Message-State: AODbwcD6a4nz7s+QR5Hm/+5Br5ejQvTLZobj36+PA7/wo/SQE3q9Lbzg mxGKchEnWVeOV0nl+ZV5PytmcJyNKo57 X-Received: by 10.55.86.69 with SMTP id k66mr9461915qkb.232.1493928092071; Thu, 04 May 2017 13:01:32 -0700 (PDT) MIME-Version: 1.0 Received: by 10.140.22.102 with HTTP; Thu, 4 May 2017 13:01:11 -0700 (PDT) In-Reply-To: References: <1FD76C7F-B4DC-44ED-8CE7-4DBEE6A8D17C@apache.org> From: "G.S.Vijay Raajaa" Date: Fri, 5 May 2017 01:31:11 +0530 Message-ID: Subject: Re: Window Function on AllWindowed Stream - Combining Kafka Topics To: Aljoscha Krettek Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114e72463f8202054eb83d8c archived-at: Thu, 04 May 2017 20:01:38 -0000 --001a114e72463f8202054eb83d8c Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field. java.lang.RuntimeException: Unexpected key group index. This indicates a bug. at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57) at org.apache.flink.runtime.state.heap.HeapListState.add( HeapListState.java:98) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.proce= ssElement( WindowOperator.java:372) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( StreamTask.java:272) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Regards, Vijay Raajaa GS On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa wrote: > Thanks for your input, will try to incorporate them in my implementation. > > Regards, > Vijay Raajaa G S > > On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek > wrote: > >> The approach could work, but if it can happen that an event from stream = A >> is not matched by an event in stream B you will have lingering state tha= t >> never goes away. For such cases it might be better to write a custom >> CoProcessFunction as sketched here: https://ci.apache.org/pr >> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html. >> >> The idea is to keep events from each side in state and emit a result whe= n >> you get the event from the other side. You also set a cleanup timer in c= ase >> no other event arrives to make sure that state eventually goes away. >> >> Best, >> Aljoscha >> >> On 3. May 2017, at 11:47, G.S.Vijay Raajaa >> wrote: >> >> Sure. Thanks for the pointer, let me reorder the same. Any comments abou= t >> the approach followed for merging topics and creating a single JSON? >> >> Regards, >> Vijay Raajaa G S >> >> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek >> wrote: >> >>> Hi, >>> An AllWindow operator requires an AllWindowFunction, instead of a >>> WindowFunction. In your case, the keyBy() seems to be in the wrong plac= e, >>> to get a keyed window you have to write something akin to: >>> >>> inputStream >>> .keyBy(=E2=80=A6) >>> .window(=E2=80=A6) >>> .apply(=E2=80=A6) // or reduce() >>> >>> In your case, you key the stream and then the keying is =E2=80=9Clost= =E2=80=9D again >>> because you apply a flatMap(). That=E2=80=99s why you have an all-windo= w and not a >>> keyed window. >>> >>> Best, >>> Aljoscha >>> >>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa >>> wrote: >>> >>> Hi, >>> >>> I am trying to combine two kafka topics using the a single kafka >>> consumer on a list of topics, further convert the json string in the st= ream >>> to POJO. Then, join them via keyBy ( On event time field ) and to merge >>> them as a single fat json, I was planning to use a window stream and ap= ply >>> a window function on the window stream. The assumption is that Topic-A = & >>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON = ) , >>> Topic B (JSON ) will be present with the same eventTime. Hence was plan= ning >>> to use a coutWindow(2) post keyBy on eventTime. >>> >>> I have couple of questions for the same; >>> >>> 1. Is the approach fine for merging topics and creating a single JSON? >>> 2. The window function on All Window stream doesnt seem to work fine; >>> Any pointers will be greatly appreciated. >>> >>> Code Snippet : >>> >>> StreamExecutionEnvironment env =3D StreamExecutionEnvironment.get >>> ExecutionEnvironment(); >>> >>> logger.info("Flink Stream Window Charger has started"); >>> >>> Properties properties =3D new Properties(); >>> >>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); >>> >>> properties.setProperty("zookeeper.connect", " >>> 127.0.0.1:2181/service-kafka"); >>> >>> properties.setProperty("group.id", "group-0011"); >>> >>> properties.setProperty("auto.offset.reset", "smallest"); >>> >>> >>> List < String > names =3D new ArrayList < > (); >>> >>> >>> names.add("Topic-A"); >>> >>> names.add("Topic-B"); >>> >>> >>> DataStream < String > stream =3D env.addSource(new FlinkKafkaConsumer08= < >>> > (names, new SimpleStringSchema(), properties)); >>> >>> DataStream < TopicPojo > pojo =3D stream.map(new >>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); >>> >>> List < String > where =3D new ArrayList < String > (); >>> >>> AllWindowedStream < String, GlobalWindow > data_window =3D >>> pojo.flatMap(new Tokenizer()).countWindowAll(2); >>> >>> DataStream < String > data_charging =3D data_window.apply(new >>> MyWindowFunction()); >>> >>> data_charging.addSink(new SinkFunction < String > () { >>> >>> >>> public void invoke(String value) throws Exception { >>> >>> >>> // Yet to be implemented - Merge two POJO into one >>> >>> } >>> >>> }); >>> >>> >>> try >>> >>> { >>> >>> env.execute(); >>> >>> } catch (Exception e) >>> >>> { >>> >>> return; >>> >>> } >>> >>> } >>> >>> } >>> >>> class Tokenizer implements FlatMapFunction < TopicPojo, String > { >>> >>> private static final long serialVersionUID =3D 1 L; >>> >>> @Override >>> >>> public void flatMap(TopicPojo value, Collector < String > out) throws >>> Exception { >>> >>> ObjectMapper mapper =3D new ObjectMapper(); >>> >>> out.collect(mapper.writeValueAsString(value)); >>> >>> } >>> >>> } >>> >>> class MyWindowFunction implements WindowFunction < TopicPojo, String, >>> String, GlobalWindow > { >>> >>> @Override >>> >>> public void apply(String key, GlobalWindow window, Iterable < TopicPoj= o >>> > arg2, Collector < String > out) >>> >>> throws Exception { >>> >>> int count =3D 0; >>> >>> for (TopicPojo in : arg2) { >>> >>> count++; >>> >>> } >>> >>> // Test Result - TO be modified >>> >>> out.collect("Window: " + window + "count: " + count); >>> >>> >>> } >>> >>> } >>> >>> class Deserializer implements MapFunction < String, TopicPojo > { >>> >>> private static final long serialVersionUID =3D 1 L; >>> >>> @Override >>> >>> public TopicPojo map(String value) throws IOException { >>> >>> // TODO Auto-generated method stub >>> >>> ObjectMapper mapper =3D new ObjectMapper(); >>> >>> TopicPojo obj =3D null; >>> >>> try { >>> >>> >>> System.out.println(value); >>> >>> >>> obj =3D mapper.readValue(value, TopicPojo.class); >>> >>> >>> } catch (JsonParseException e) { >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> >>> } catch (JsonMappingException e) { >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> } catch (IOException e) { >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> } >>> >>> return obj; >>> >>> } >>> >>> } >>> >>> I am getting - The method apply(AllWindowFunction) >>> in the type AllWindowedStream is not applicable >>> for the arguments (MyWindowFunction) error. >>> >>> Kindly give your input. >>> >>> Regards, >>> Vijay Raajaa GS >>> >>> >>> >> >> > --001a114e72463f8202054eb83d8c Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

I tried to reorder and the w= indow function works fine. but then after processing few stream of data fro= m Topic A and Topic B, the window function seem to throw the below error. T= he keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index= . This indicates a bug.

at org.a= pache.flink.runtime.state.heap.StateTable.set(Stat= eTable.java:57)

at org.a= pache.flink.runtime.state.heap.HeapListState.add(H= eapListState.java:98)

at org.a= pache.flink.streaming.runtime.operators.windowing.WindowOperator.processEle= ment(WindowOperator.java:372)

at org.a= pache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at org.a= pache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at org.a= pache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.a= pache.flink.runtime.taskmanager.Task.run(Task.java= :655)

at java.= lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS=C2=A0


On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <= gsvijayraajaa@gmail.com> wrote:
Thanks for your input, will try to incorporate them i= n my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2= 017 at 3:28 PM, Aljoscha Krettek <aljoscha@apache.org> wro= te:
T= he approach could work, but if it can happen that an event from stream A is= not matched by an event in stream B you will have lingering state that nev= er goes away. For such cases it might be better to write a custom CoProcess= Function as sketched here:=C2=A0https://ci.apache.org/projects/flink/flink-docs-release-1.2/= dev/stream/process_function.html.

The idea is t= o keep events from each side in state and emit a result when you get the ev= ent from the other side. You also set a cleanup timer in case no other even= t arrives to make sure that state eventually goes away.

Best,
Aljoscha
<= br>
On 3. May 2017, at 11:47, G.S.Vijay = Raajaa <gsv= ijayraajaa@gmail.com> wrote:

Sure= . Thanks for the pointer, let me reorder the same. Any comments about the= =C2=A0approach followed for merging topics= and creating a single JSON?
Regards,
<= div>Vijay Raajaa G S

On Wed, May 3, 2017 = at 2:41 PM, Aljoscha Krettek <aljoscha@apache.org> wrote:<= br>
Hi,An AllWindow operator requires an AllWindowFunction, instead of a Window= Function. In your case, the keyBy() seems to be in the wrong place, to get = a keyed window you have to write something akin to:

inputStream
=C2=A0 .keyBy(=E2=80=A6)
=C2=A0 .window(= =E2=80=A6)
=C2=A0 .apply(=E2=80=A6) // or reduce()

=
In your case, you key the stream and then the keying is =E2=80= =9Clost=E2=80=9D again because you apply a flatMap(). That=E2=80=99s why yo= u have an all-window and not a keyed window.

Best,=
Aljoscha

On 2. May 2017, at 09:20,= G.S.Vijay Raajaa <gsvijayraajaa@gmail.com> wrote:

Hi,

I am trying to combine t= wo kafka topics using the a single kafka consumer on a list of topics, furt= her convert the json string in the stream to POJO. Then, join them via keyB= y ( On event time field ) and to merge them as a single fat json, I was pla= nning to use a window stream and apply a window function on the window stre= am. The assumption is that Topic-A & Topic-B can be joined on Event Tim= e and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present wi= th the same eventTime. Hence was planning to use a coutWindow(2) post keyBy= on eventTime.

I have couple of questions for the = same;

1. Is the approach fine for merging topics a= nd creating a single JSON?
2. The window function on All Window s= tream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet :=C2=A0

StreamExecutionEnvironment env =3D StreamExecutionEnv= ironment.getExecutionEnvironment();

lo= gger.info("Flink Stream Window Charger has started");<= /p>

Properties properties =3D= new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properti= es.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafk= a");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.of= fset.reset", "smallest");


List < String > names =3D new ArrayList < &g= t; ();


names.add("Topi= c-A");

names.= add("Topic-B");


DataStream < String > stream =3D env.addSource(new FlinkKafkaConsumer08 < > = (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo =3D strea= m.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTi= me());

List < S= tring > where =3D new ArrayList < String > ();

AllWindowedStream < S= tring, GlobalWindow > data_window =3D pojo.flatMap(new Tokenizer()).coun= tWindowAll(2);

DataStream < String > data_charging =3D data_window= .apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < S= tring > () {


public void invoke(String value) throws Exception {
=


=C2=A0 // Yet to be implemented - Me= rge two POJO into one=C2=A0

=C2=A0}

});


try

{

=C2=A0env.execute();

} catch (Exception e)

{

=C2=A0return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String = > {

=C2=A0private static final long serialVersionUID =3D 1 L;

=C2=A0@Override

=C2=A0public void flatMap(TopicP= ojo value, Collector < String > out) throws Exception {

=C2=A0 ObjectMapper= mapper =3D new ObjectMapper();

=C2=A0 out.collect(mapper.writeValueAsSt= ring(value));

=C2=A0}

class MyWindow= Function implements WindowFunction < TopicPojo, String, String, GlobalWi= ndow > {

=C2=A0= @Override

=C2=A0pu= blic void apply(String key, GlobalWindow window, Iterable < TopicPojo &g= t; arg2, Collector < String > out)

=C2=A0throws Exception {

=C2=A0 int count =3D 0;

= =C2=A0 for (TopicPojo in : arg2) {

=C2=A0 =C2=A0count++;<= /font>

=C2=A0 }

=C2=A0 // Test Result - TO = be modified

=C2=A0 out.collect("Window: " + window + "count: "= ; + count);


=C2=A0}

}

class Des= erializer implements MapFunction < String, TopicPojo > {

=C2=A0private stat= ic final long serialVersionUID =3D 1 L;

=C2=A0@Override

=C2=A0public TopicPojo map(String value) th= rows IOException {

=C2=A0 // TODO Auto-generated method stub

=C2=A0 ObjectMapper mapper =3D new ObjectMapper()= ;

=C2=A0 TopicPojo= obj =3D null;

=C2= =A0 try {


=C2=A0 =C2=A0System.o= ut.println(value);


=C2=A0 =C2= =A0obj =3D mapper.readValue(value, TopicPojo.class);

<= font face=3D"monospace, monospace">

=C2=A0 } catch (JsonParseException e) {


=C2=A0 =C2=A0// TODO Auto-generated catch bl= ock


=C2=A0 =C2=A0throw new IOEx= ception("Failed to deserialize JSON object.");


=C2=A0 } catch (JsonMappingException e) {<= /p>


=C2=A0 =C2=A0// TODO Auto-generated = catch block


=C2=A0 =C2=A0throw = new IOException("Failed to deserialize JSON object.");

=

=C2=A0 } catch (IOException = e) {


=C2=A0 =C2=A0// TODO Auto-= generated catch block


=C2=A0 = =C2=A0throw new IOException("Failed to deserialize JSON object.")= ;

=C2=A0 }<= /p>

=C2=A0 return obj;=

=C2=A0}

}


<= /div>
I am getting -=C2=A0The method apply(AllWindowFunction<String,R,G= lobalWindow>) in the type AllWindowedStream<String,GlobalWindow&= gt; is not applicable for the arguments (MyWindowFunction) error.
=
Kindly gi= ve your input.

Regards,=
Vijay Raajaa = GS=C2=A0




--001a114e72463f8202054eb83d8c--