Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D0397200C32 for ; Thu, 9 Mar 2017 19:58:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CD2E6160B75; Thu, 9 Mar 2017 18:58:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7D993160B5F for ; Thu, 9 Mar 2017 19:58:54 +0100 (CET) Received: (qmail 18288 invoked by uid 500); 9 Mar 2017 18:58:53 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 18278 invoked by uid 99); 9 Mar 2017 18:58:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Mar 2017 18:58:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1D5AFC0370 for ; Thu, 9 Mar 2017 18:58:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.994 X-Spam-Level: ** X-Spam-Status: No, score=2.994 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, MANY_SPAN_IN_TEXT=0.001, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=reflektion.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id HtS6BE7wyFXO for ; Thu, 9 Mar 2017 18:58:48 +0000 (UTC) Received: from mail-qk0-f174.google.com (mail-qk0-f174.google.com [209.85.220.174]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 712CC5F39F for ; Thu, 9 Mar 2017 18:58:47 +0000 (UTC) Received: by mail-qk0-f174.google.com with SMTP id y76so134416560qkb.0 for ; Thu, 09 Mar 2017 10:58:47 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=reflektion.com; s=google; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=gFdYFdIP/jOKDUVosgyi72+2WqEAGSUXB7DGGDT9toU=; b=KhkEnFnZDz9aVq/v2pfFYKvEZ1K216ms/b8wX7Iv5vJFN4Yvv70pCBWPSmqAofx8KH CZfkJnByR4JA+n/8OA/lCG+ihV2q9s0TNYGqTYUNH+WSXJwKu1b0tIK9pEm6RtywezGI xqbNOknpHuyZTW4lwKrPWpW0+et4PgreuAHaY= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=gFdYFdIP/jOKDUVosgyi72+2WqEAGSUXB7DGGDT9toU=; b=Tu8dXQcK5XorVDTxcLesaQoBvPXXEez7TwodK0exY3oOM2fa2yfglMplJDIhmnLSHP d0cgTLJwbXp+i04/d3o0icA5N0zjF58nKIsGBw/QrfTA9xLNp03sJy9r8oMZ7Brzdn3P BBROAbLBpeRD+atytuC6/gzFv+GxRioLy8xrIf2WgjGULY2j/3FHN0UXqsgnmgufN2I1 +eD02Ysn1GyWEe8SYMvvzZidz38NFnADOm3J3s4LAQ7juFT4jo1DsKnwM3IaVRyak8ox sE83/aCoDL+MMGwC6qjHUA4jWrB6hjUs5r0Qzsp1YNTmxH+uvVz+JSPaSqaxUjranxuH VBvQ== X-Gm-Message-State: AFeK/H0yTG0BJCvmOW1hJhBKdnUN+57wC091Qa2pQP1+vFTI3KPFFbVJ2cFEbyYpr7cV0dCzGBIL0mQKm3Ew0Qjs X-Received: by 10.55.22.66 with SMTP id g63mr14920164qkh.18.1489085926146; Thu, 09 Mar 2017 10:58:46 -0800 (PST) MIME-Version: 1.0 Received: by 10.12.147.152 with HTTP; Thu, 9 Mar 2017 10:58:45 -0800 (PST) In-Reply-To: <1489055210.4117489.905615248.14BB62C0@webmail.messagingengine.com> References: <1488938370843-12094.post@n4.nabble.com> <1489055210.4117489.905615248.14BB62C0@webmail.messagingengine.com> From: Sam Huang Date: Thu, 9 Mar 2017 10:58:45 -0800 Message-ID: Subject: Re: window function not working when control stream broadcast To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1147734aab485c054a50d59d archived-at: Thu, 09 Mar 2017 18:58:56 -0000 --001a1147734aab485c054a50d59d Content-Type: text/plain; charset=UTF-8 Hi Aljoscha, Here's the code: private static class DataFilterFunImpl extends RichCoFlatMapFunction { private JSONParser parser; private Map> whiteListMap = new HashMap<>(); @Override // tuple5(domain, device_type, type, key, count_or_sum) public void flatMap1(KVTuple6 dataTuple, Collector collector) throws Exception { String type = dataTuple.f2; String[] keyValue = dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP); String key = keyValue[0]; switch (type) { case RawEventExtractor.Constants.VALUE_COUNT: { if (whiteListMap.containsKey(key)) { ControlJsonConfig ruleConfig = whiteListMap.get(key).get(RawEventExtractor.Constants.VALUE_COUNT); if (ruleConfig != null) { String value = keyValue.length > 1 ? keyValue[1] : ""; String bucket = ruleConfig.getBucketName(value); if (bucket != null) { dataTuple.setField(String.join(RawEventExtractor.Constants.DEFAULT_VALUE_SP, key, bucket), 3); collector.collect(dataTuple); } } else { collector.collect(dataTuple); } } break; } case RawEventExtractor.Constants.VALUE_SUM: { if (whiteListMap.containsKey(key) && whiteListMap.get(key).containsKey(RawEventExtractor.Constants.VALUE_SUM)) { collector.collect(dataTuple); } break; } default: collector.collect(dataTuple); } } @Override public void flatMap2(String jsonStr, Collector collector) throws Exception { // Map> whiteListMap = whiteListMapState.value(); try { if (parser == null) { parser = new JSONParser(); } JSONObject jsonConfig = (JSONObject) parser.parse(jsonStr); Tuple2> config = RawEventExtractor.getKeyConfig(jsonConfig); if (config.f1 == null) { whiteListMap.remove(config.f0); } else { whiteListMap.put(config.f0, config.f1); } } catch (Exception e) {} } } FYI, if I setParallelism of both the control stream and data stream, the window function works. Is it necessary to do so for broadcast() function? On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek wrote: > Hi Sam, > could you please also send the code for the DataFilterFunImpl and your > timestamps/watermark assigner. That could help in figuring out the problem. > > Best, > Aljoscha > > > On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote: > > Hi Timo, > > The window function sinks the data into InfluxDB, and it's not triggered. > If I comment the ".timeWindow", and print results after the reduce > function, it works > Code for window function is here: > > private static class WindowFunImpl implements WindowFunction { > @Override > public void apply(Tuple tuple, TimeWindow window, Iterable iterable, > Collector collector) throws Exception { > KVTuple6 kvTypeTuple = iterable.iterator().next(); > System.*out*.println("window: " + kvTypeTuple); // Doesn't work here if use broadcast > Point.Builder builder = Point.*measurement*(*INFLUXDB_MEASUREMENT*) > .time(window.getStart(), TimeUnit.*MILLISECONDS*) > .tag(*TAG_DOMAIN*, kvTypeTuple.f0) > .tag(*TAG_DEVICE*, kvTypeTuple.f1) > .tag(*TAG_TYPE*, kvTypeTuple.f2) > .tag(*TAG_KEY*, kvTypeTuple.f3) > .addField(*FIELD*, kvTypeTuple.f4); > > collector.collect(builder.build()); > } > } > > > On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther wrote: > > Hi Sam, > > could you explain the behavior a bit more? How does the window function > behave? Is it not triggered or what is the content? What is the result if > you don't use a window function? > > Timo > > > Am 08/03/17 um 02:59 schrieb Sam Huang: > > btw, the reduce function works well, I've printed out the data, and they > are > all correct. So are the timestamps and watermarks. And if I remove > ".broadcast()", the data is successfully sinked. > > Any help? > > > > -- > View this message in context: http://apache-flink-user-maili > ng-list-archive.2336050.n4.nabble.com/window-function-not- > working-when-control-stream-broadcast-tp12093p12094.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > > > --001a1147734aab485c054a50d59d Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Aljoscha,

Here's the code:
=
privat=
e static class DataFilterFunImpl extends RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
private JSONParser parser;
private Map<String, Map<String, C= ontrolJsonConfig>> whiteListMa= p =3D new HashMap<&g= t;();

<= /span>@Override
// tuple5(domain, device_type, type, key, count_or_sum)
public void fla= tMap1(KVTuple6 dataTuple, Collector<KVTuple6> collector) throws Exception {
String type =3D dataTuple.f2
;
= String[] keyValue =3D dataTuple.f3.split(RawEventExtractor.Constants.DEFAULT_VALUE_SP);
String key =3D keyValue[0]= ;
switch (type) {
case RawEventExtractor.Constants.VALUE_COUNT
: { if (whiteListMap
.containsKey(key)) {<= br> ControlJsonConfig ruleConfig =3D whiteListMap.get(key).get(RawEventExtractor.= Constants.VALUE_CO= UNT);
if (ruleConfig !=3D = null) {
= String value =3D keyValue.l= ength > 1 ? keyValu= e[1] : "";<= br> = String bucket =3D ruleConfig.getBucketName(value);
= if (bucket !=3D null) {
dataTuple.setFie= ld(String.join(RawEventExtractor.C= onstants.DEFAULT_V= ALUE_SP, key, bucket), 3);
collector.collect(dataTuple);
}
} else
{
= collector.collect(dataTuple); }
}
break;
= }
case RawEventExtractor.Constants.VALUE_SUM: {
if (whiteListMap.containsKey(key) && whiteListMap.get(key).containsKey(RawEventExtracto= r.Constants.VALUE_= SUM)) {
collector.collect(dataTuple);
}
break;
}
default: collector.collect(dataTuple);
= }
}


@Override
<= /span>public void flatMap2(String jsonStr, Collector<KVTuple6> collector) throws Exception {
// Map<String, Map<String, ControlJ= sonConfig>> whiteListMap =3D whiteListMapState.value();
try {
if (parser =3D=3D null) {
= parser =3D new JSONParser();
= }
JSONObject jsonConfig =3D (JSONObject) = parser.parse(jsonStr);
Tuple2<String, Map<String, ControlJsonConfig>> config =3D RawEventExtractor.getKeyConfig(jsonConfig);
= if (config.f1 =3D=3D <= span style=3D"color:rgb(204,120,50)">null
) {
= whiteListMap.remove(config.f0
);
= } else {
= whiteListMap.put(co= nfig.f0, config.f1);
}
} catch (Exception e) {}
}
}=

FYI, if I setParallelism of both the control stream and data stre= am, the window function works. Is it necessary to do so for broadcast() fun= ction?


On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek <aljos= cha@apache.org> wrote:
<= /u>
Hi Sam,
could you please also send the code for the DataFilterFunImpl and your= timestamps/watermark assigner. That could help in figuring out the problem= .

Best,
Aljoscha


On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:
Hi Timo,

The window function sinks the data into InfluxDB, and it's not tri= ggered.
If I comment the ".timeWindow", and print results after the = reduce function, it works
Code for window function is here:

private static class WindowFun= Impl implements WindowFunction<KVTuple6,= Point,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KVTuple= 6> iterable,
Collector<Point
> collector) throws Exception {
KVTuple6 kvTypeTuple =3D iterable.iterator().next();
System.out.println("window: &qu= ot; + kvTypeTuple); // Doesn't w= ork here if use broadcast
Point.Builder builder =3D Point.measurement(= INFLUXDB_MEASUREMENT)
.time(window.getStart(), TimeUnit.MILLI= SECONDS)
.tag(TAG_DOMAIN, kvTypeTuple= .f0)
.tag(TAG_DEVICE, kvTypeTuple= .f1)
.tag(TAG_TYPE, kvTypeTuple.<= span class=3D"m_-4701958911456779203colour" style=3D"color:rgb(152,118,170)= ">f2)
.tag(TAG_KEY, kvTypeTuple.f3)
.addField(FIELD, kvTypeTuple= .f4);

collector.collect= (builder.build());
}
}

On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther &l= t;twalthr@apache.or= g> wrote:
Hi Sam,

could you explain the behavior a bit more? How does the window functi= on behave? Is it not triggered or what is the content? What is the result i= f you don't use a window function?

Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works w= ell, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
Sent from the Apache Flink User Mailing List archive. mailing list ar= chive at Nabble.com.



--001a1147734aab485c054a50d59d--