From user-return-19933-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri May 11 13:31:12 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 20B44180647 for ; Fri, 11 May 2018 13:31:11 +0200 (CEST) Received: (qmail 99463 invoked by uid 500); 11 May 2018 11:31:10 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 99453 invoked by uid 99); 11 May 2018 11:31:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 May 2018 11:31:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 484421A2342 for ; Fri, 11 May 2018 11:31:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.999 X-Spam-Level: * X-Spam-Status: No, score=1.999 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id FxP6r3Iu-QaS for ; Fri, 11 May 2018 11:31:09 +0000 (UTC) Received: from mail-wr0-f180.google.com (mail-wr0-f180.google.com [209.85.128.180]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9FFFB5F178 for ; Fri, 11 May 2018 11:31:08 +0000 (UTC) Received: by mail-wr0-f180.google.com with SMTP id a12-v6so4993472wrn.13 for ; Fri, 11 May 2018 04:31:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=/gIzudv+QhzhEqeTXbfxxRaA582HgLz6aPta8s7Rlls=; b=blKxjJC2X8nVfOjaLqONXIlUW6UKbvR2snSD388WhxzVxtzMi00ZWMXlB+CesYG4rf EQCRR0elgjDrAa2Gv21jMZfgLP1KsL/9p2ZTIRDaa5Kf+vGE+yzEMewcrULQ8WLoqBN7 XaqfsdDjaxUulQ4MJGtcD8rS/waYtuHwEZ7pMmiixaCBhRvqyABIh00wXWFz6AfdeOhY qO0f3NwDay6Oh1QOTTZbElqdXyc2eQnS4qw6imZ2ZHqmuBSFmdEUwIdKJQm7/xqbdEcj BI7dGpWOOUoh2eOaoElB+DHxqYzx2uAydwJ2NF7/e9XIm86YWfqlXcDvla7gHnhjFscV JtHg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=/gIzudv+QhzhEqeTXbfxxRaA582HgLz6aPta8s7Rlls=; b=FnXviSH+Weyt5PpxxZ2yGCBqI/uDdBzkQ5TyqujL3F9aSENGL/5N1A8M/A22RK7zv8 NIyCm86v2Mtxw7cQBmcaMh32jiCnhz2WPBwdXyVG9m7GvBSWg+qVqZ13e7R+DE0BlgUv mNwnAt/DEiQ+pmBwnY5ctJDsSZjFe7Apwsx4FhuTWTwNfsn6CPUL2h8ykrdY3VMqdJMq lKoPE/F/Sn/2UM45wS1q6BGj6f58+H3PEAunVTUeTmIkL9ePfZvtPtMFalhvzlQVkGUz 8+K9EruL2AKPIwRdrBkfAQ1OGIDPa3II6X+Y0OLtpwR7YATYe4TEyZWYzREBhe9RdkLz aoew== X-Gm-Message-State: ALKqPwfGb6egyiDXmo1EGrhzVToc79NhU3D3Oa7hTqQC35Bibc0mQ5p8 kEDp0xePjWYUlipMW9e6rfUIlA== X-Google-Smtp-Source: AB8JxZp9JyOZ+IXa2a+nzsm/ukJ1yiKw0YPsCC4Zr2R63m0pkEqUpBGLwR9Jwr4dUc7t8RfjM8//jA== X-Received: by 2002:adf:b8b0:: with SMTP id i45-v6mr4239827wrf.105.1526038261541; Fri, 11 May 2018 04:31:01 -0700 (PDT) Received: from piotrs-mbp.fritz.box (dslb-002-205-086-147.002.205.pools.vodafone-ip.de. [2.205.86.147]) by smtp.gmail.com with ESMTPSA id c27-v6sm3612590wrg.75.2018.05.11.04.31.00 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 11 May 2018 04:31:00 -0700 (PDT) From: Piotr Nowojski Message-Id: <340ACC95-6F1E-423F-8577-EA0D52C2E895@data-artisans.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_4E651B1C-D89D-4808-AE0F-B09824C374E2" Mime-Version: 1.0 (Mac OS X Mail 11.3 \(3445.6.18\)) Subject: Re: How to broadcast messages to all task manager instances in cluster? Date: Fri, 11 May 2018 13:30:59 +0200 In-Reply-To: Cc: user@flink.apache.org To: Di Tang References: X-Mailer: Apple Mail (2.3445.6.18) --Apple-Mail=_4E651B1C-D89D-4808-AE0F-B09824C374E2 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, I don=E2=80=99t quite understand your problem. If you broadcast message = as an input to your operator that depends on this configuration, each = instance of your operator will receive this configuration. It shouldn't = matter whether Flink scheduled your operator on one, some or all of the = TaskManagers. It only should matter if operators running your = configuration sensitive code receive the broadcasted message. DataStream<> input =3D xxx; DataStream<> controlConfigInput =3D yyy; DataStream<> data =3D input. .do() .something() .fancy(); controlConfigInput.broadcast() .connect(data) .flatMap(new MyFancyOperatorThatDependsOnConfigStream()) Or slide 36 from here: = https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics= = Piotrek > On 11 May 2018, at 11:11, Di Tang wrote: >=20 > Hi guys: >=20 > I have a Flink job which contains multiple pipelines. Each pipeline = depends on some configuration. I want to make the configuration dynamic = and effective after change so I created a data source which periodically = poll the database storing the configuration. However, how can I = broadcast the events to all task manager instances? The = datastream.broadcast() only applies to the parallel instances of = operator. And I don't want to connect the configuration data source to = each pipeline because it is too verbose. If Flink cannot explicitly = broadcast messages to task managers, is there any method to guarantee = the parallel operator is distributed on all task managers? >=20 > Thanks, > Di --Apple-Mail=_4E651B1C-D89D-4808-AE0F-B09824C374E2 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi,

I = don=E2=80=99t quite understand your problem. If you broadcast message as = an input to your operator that depends on this configuration, each = instance of your operator will receive this configuration. It shouldn't = matter whether Flink scheduled your operator on one, some or all of the = TaskManagers. It only should matter if operators running your = configuration sensitive code receive the broadcasted message.


DataStream<> input =3D xxx;
DataStream<> controlConfigInput =3D yyy;

DataStream<> data = =3D input.
.do()
= .something()
.fancy();

controlConfigInput.broadcast()
= .connect(data)
.flatMap(new = MyFancyOperatorThatDependsOnConfigStream())


Piotrek

On 11 May 2018, at 11:11, Di = Tang <tangdi.bupt@gmail.com> wrote:

Hi guys:

I have a Flink job = which contains multiple pipelines. Each pipeline depends on some = configuration. I want to make the configuration dynamic and effective = after change so I created a data source which periodically poll the = database storing the configuration. However, how can I broadcast the = events to all task manager instances?  The = datastream.broadcast() only applies to the parallel instances of = operator. And I don't want to connect the configuration data source to = each pipeline because it is too verbose. If Flink cannot explicitly = broadcast messages to task managers, is there any method to guarantee = the parallel operator is distributed on all task managers?

Thanks,
Di

= --Apple-Mail=_4E651B1C-D89D-4808-AE0F-B09824C374E2--