Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B925200D53 for ; Tue, 5 Dec 2017 16:25:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3A1CE160C1B; Tue, 5 Dec 2017 15:25:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32D17160C0A for ; Tue, 5 Dec 2017 16:25:22 +0100 (CET) Received: (qmail 17628 invoked by uid 500); 5 Dec 2017 15:25:21 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 17618 invoked by uid 99); 5 Dec 2017 15:25:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Dec 2017 15:25:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 39BE01A16BD for ; Tue, 5 Dec 2017 15:25:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.177 X-Spam-Level: * X-Spam-Status: No, score=1.177 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_HELO_PASS=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=citi.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id z3OKl8NHUokJ for ; Tue, 5 Dec 2017 15:25:17 +0000 (UTC) Received: from mx-a.mail.citi.com (mx-a.mail.citi.com [67.231.145.106]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 9E4BC5F2A8 for ; Tue, 5 Dec 2017 15:25:16 +0000 (UTC) Received: from pps.filterd (m0083709.ppops.net [127.0.0.1]) by mx0a-00123c02.pphosted.com (8.16.0.21/8.16.0.21) with SMTP id vB5FKtv1035135 for ; Tue, 5 Dec 2017 15:25:09 GMT DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=citi.com; h=from : to : subject : date : message-id : content-type : mime-version; s=maila; bh=EhPc4nn/wRSK4JCPGNbJUCkQcQcwETh0JOQrUAYUv6g=; b=PP8HEd1yV8hr9sMRAcT9xyPoFvqq+r9hjlYdMeke+H7dgqtPLHh3D3MWm6V9GKIsWlf+ cS8QmLIUUDXofkUmDiAS/W/7nllbqm+f//uoD6M09prAkO/E9+Llfad+Qs4AoRtSY3GW vq8ctgG45V9y+lx7AJWwSIQg5wmE+eEIK/hJYwAKAFp5NM3/SP0zXNhd9H+3TgQXzA/W yOEDfzrplhdor8568f8OHSoJzBlbwPWVVu/E0T7atRLx6EEAYv88DIuNpIB3dF2GpSWU VcybPH1IWLc1h/tsTCiDNEMoUNsdTgdW/cwb+ca1rXKm7Mg0WBiCCqPMRCXay2lmdbFD /A== Received: from mail.citigroup.com (smtpoutbound.citigroup.com [192.193.193.15]) by mx0a-00123c02.pphosted.com with ESMTP id 2ekjdhu6vx-1 (version=TLSv1.2 cipher=AES256-SHA bits=256 verify=NOT) for ; Tue, 05 Dec 2017 15:25:08 +0000 Received: from imbhub-mw34.nam.nsroot.net (imbhub-mw34.nam.nsroot.net [144.215.196.152]) by smtpinbound.citigroup.com (Sentrion-MTA-4.3.1/Sentrion-MTA-4.2.2) with ESMTP id vB5FP7UC016344 for ; Tue, 5 Dec 2017 15:25:07 GMT Received: from imbdlprt-mw05.nam.nsroot.net (imbdlprt-mw05.nam.nsroot.net [144.215.116.194]) by imbhub-mw34.nam.nsroot.net (Sentrion-MTA-4.3.1/Sentrion-MTA-4.2.2) with ESMTP id vB5FP6dm000998 for ; Tue, 5 Dec 2017 15:25:06 GMT Received: from imbdlpbuf-gt01.nam.nsroot.net (namdlpdimpsw14.nam.nsroot.net [169.177.230.121]) by imbdlprt-mw05.nam.nsroot.net (Sentrion-MTA-4.3.1/Sentrion-MTA-4.2.2) with ESMTP id vB5FP5nW015560 for ; Tue, 5 Dec 2017 15:25:06 GMT Received: from EXLNIHT09.eur.nsroot.net (EXLNIHT09.eur.nsroot.net [169.182.85.59]) by imbdlpbuf-gt01.nam.nsroot.net (Sentrion-MTA-4.3.1/Sentrion-MTA-4.2.2) with ESMTP id vB5FOwkv012138 for ; Tue, 5 Dec 2017 15:25:05 GMT Received: from EXLNHT06.eur.nsroot.net (169.182.86.153) by EXLNIHT09.eur.nsroot.net (169.182.85.59) with Microsoft SMTP Server (TLS) id 14.3.361.1; Tue, 5 Dec 2017 15:25:04 +0000 Received: from EXLNMB46.eur.nsroot.net ([169.254.6.109]) by EXLNHT06.eur.nsroot.net ([169.182.86.153]) with mapi id 14.03.0361.001; Tue, 5 Dec 2017 15:25:05 +0000 From: "Sofer, Tovi " To: "user@flink.apache.org" Subject: slot group indication per operator Thread-Topic: slot group indication per operator Thread-Index: AdNt3S6khFThg3r2QyyEsj1oIO4Qgw== Date: Tue, 5 Dec 2017 15:25:03 +0000 Message-ID: <981577EAC00F8245AB0ABE8CAA3EBC4812EE291A@exlnmb46.eur.nsroot.net> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-originating-ip: [169.182.87.212] x-wiganss: 01000000010017EXLNHT06.eur.nsroot.net ID0042<981577EAC00F8245AB0ABE8CAA3EBC4812EE291A@exlnmb46.eur.nsroot.net> Content-Type: multipart/alternative; boundary="_000_981577EAC00F8245AB0ABE8CAA3EBC4812EE291Aexlnmb46eurnsro_" MIME-Version: 1.0 X-CFilter-Loop: Reflected X-Proofpoint-Virus-Version: vendor=fsecure engine=2.50.10432:,, definitions=2017-12-05_05:,, signatures=0 archived-at: Tue, 05 Dec 2017 15:25:23 -0000 --_000_981577EAC00F8245AB0ABE8CAA3EBC4812EE291Aexlnmb46eurnsro_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Hi all, I am trying to use the slot group feature, by having 'default' group and ad= ditional 'market' group. The purpose is to divide the resources equally between two sources and thei= r following operators. I've set the slotGroup on the source of the market data. Can I assume that all following operators created from this source will use= same slot group of 'market'? (The operators created for market stream are pretty complex, with connect a= nd split). In Web UI I saw there are 16 slots, but didn't see indication per operator = to which group it was assigned. How can I know? Relevant Code: env.setParallelism(8); conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to all= ow Parallelism of 8 per group // Market source and operators: KeyedStream windowedStreamA =3D sourceProvider.provid= e(env) .name(spotSourceProvider.getName()) .slotSharingGroup(SourceMsgType.MARKET.slotGroup()) .flatMap(new ParserMapper(new MarketMessageParser())) .name(ParserMapper.class.getSimpleName()) .filter(new USDFilter()) .name(USDFilter.class.getSimpleName()) .keyBy(MarketEvent.CURRENCY_FIELD) .timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS)) .process(new LastInWindowPriceChangeFunction())) .name(LastInWindowPriceChangeFunction.class.getSimpleName()) .keyBy(SpotTickEvent.CURRENCY_FIELD); marketConnectedStream =3D windowedStreamA.connect(windowedStreamB) .flatMap(new MarketCoMapper())) .name(MarketCoMapper.class.getSimpleName()) SplitStream stocksWithSpotsStreams =3D marketConnectedS= tream .split( market -> ImmutableList.of("splitA"," splitB") ); DataStream< MarketAWithMarketB> splitA =3D stocksWithSpotsStreams.select("s= plitA "); Thanks and regards, Tovi --_000_981577EAC00F8245AB0ABE8CAA3EBC4812EE291Aexlnmb46eurnsro_ Content-Type: text/html; charset="us-ascii" Content-Transfer-Encoding: quoted-printable

Hi all,

I am trying to use the slot group feature, by having R= 16;default’ group and additional ‘market’ group.

The purpose is to divide the resources equally between tw= o sources and their following operators.

I’ve set the slotGroup on the source of the market = data.

Can I assume that all following operators created from th= is source will use same slot group of ‘market’?

(The operators created for market stream are pretty compl= ex, with connect and split).

In Web UI I saw there are 16 slots, but didn’t see = indication per operator to which group it was assigned. How can I know?

Relevant Code:
env.setParallelism(8);

conf.setInteger(Conf= igConstants.TASK_MANAGER_NUM_TASK_SLOTS<= span style=3D"font-size:9.0pt;font-family:"Courier New";color:bla= ck">, 16); \\ to allow Parallelism of 8 per group

 

// Market source and= operators:

KeyedStream<SpotTickEvent, Tuple> windowedStreamA =3D sourceProvider.provide(env)
   &nbs= p;    .name(
spotSourceProvider.getName())
        .slotSharin= gGroup(SourceMsgType.
MARKET.slot= Group())
        .flatMap(
<= span style=3D"font-size:9.0pt;color:navy">new ParserMapper(new MarketMessageParser()))
        .nam= e(ParserMapper.
class.getSimpleName())
&= nbsp;       .filter(
new USDFilter())
        .nam= e(USDFilter.
class.getSimpleName())
&nbs= p;       .keyBy(MarketEvent.
CURRENCY_FIELD)
     =    .timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS<= span style=3D"font-size:9.0pt;color:black">))
    &n= bsp;   .process(new LastInWindow= PriceChangeFunction()))
        .name= (LastInWindowPriceChangeFunction.
class.get= SimpleName())
        .keyBy(SpotTick= Event.
CURRENCY_F= IELD);

 

marketConnectedStream =3D windowedStreamA.connect(windowedStreamB)
&nbs= p;           .flatMap(new MarketCoMapper()))
   &= nbsp;        .name(MarketCoMapper.class.getSimpleName())
 
SplitStream<MarketAWithMarketB> stocksWithSpotsStreams =3D marketCon=
nectedStream
        .split( market -= > ImmutableList.of(
"splitA"," splitB&= quot;) );

Dat= aStream< MarketAWithMarketB> splitA =3D stocksWithSpotsStreams.select= (
"splitA "<= /span>);<= /pre>

 

 

Thanks and regards,

Tovi

 

 

--_000_981577EAC00F8245AB0ABE8CAA3EBC4812EE291Aexlnmb46eurnsro_--