Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 20A40200C34 for ; Mon, 27 Feb 2017 18:41:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1F1CC160B60; Mon, 27 Feb 2017 17:41:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4342D160B5B for ; Mon, 27 Feb 2017 18:41:08 +0100 (CET) Received: (qmail 92698 invoked by uid 500); 27 Feb 2017 17:41:02 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 92688 invoked by uid 99); 27 Feb 2017 17:41:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Feb 2017 17:41:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 46F2A18E85A for ; Mon, 27 Feb 2017 17:41:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.02 X-Spam-Level: X-Spam-Status: No, score=-0.02 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id x7-jkK3XoCGx for ; Mon, 27 Feb 2017 17:41:01 +0000 (UTC) Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id BEC355FB0B for ; Mon, 27 Feb 2017 17:41:00 +0000 (UTC) Received: by mail-wm0-f44.google.com with SMTP id u199so24848712wmd.1 for ; Mon, 27 Feb 2017 09:41:00 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:to:cc:subject:date:message-id:organization:in-reply-to :references:mime-version; bh=IuIExVnZ4af5tOmPfo5GE2zxpaNYssSDYfhUDd3sElo=; b=tM9P+OtkIyWEXvIK9YXr18WoslnzpDWHJXLhjuWOCAGoL0NSmfzrEtix3bHkh+Hymi Y7a933BhmSCKCQOdz0W/xEKE6+6mvZuEbwUKL2BE5QCGx2mABlGvUrjdODaHta49F20t iodJqOdkLqJQrvqPFe8wKY5DfKFrf2BN5iWTZut+56AXCRDHygb6naEydwZVOmXwXfZC +AJZzgfmKH47TH5CaVNuER5TYQIvjOWrDvWV5c7w83p0juZjeSx7AdpppAfJlETqJ7Ef BzsM9VaVWEhdH56XjMgakGjHcyVXHkPR6f/yK9308GP9VhupLJIIFVO+my4s4AtztdyV oY9A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:organization :in-reply-to:references:mime-version; bh=IuIExVnZ4af5tOmPfo5GE2zxpaNYssSDYfhUDd3sElo=; b=D+qDBcNITNR+ne2hNhOT1p1ADSPW21uihFimkHLv7R3ShZ5+o+zBwnyZfiWVpHuUkc ReCprfdIOEjrZptsLaCr7wcKl2IRn3VU4SP7Gm0aPNLJI5CmwBT8iv8WRrazuZDjhmvj YXfEmR8rXSAcjiGnD+1U9LbM58/RnuTFYclMo8Ofw+9ptTC2XclHyAqUqFbU4sWWfmA0 2CPF4Lw3VeDws3PwrxlqABuPkWhJPZJr9UYTeXh0ZicAHE+6atnfYA3UronIQAbKLMyX qK1Mz1DsevYbE46AEKry2CyE/nIXwHBdjCrtljd4eedJvuVZNAsTsZESjsEp4j55TbdX yq8g== X-Gm-Message-State: AMke39n7uWYLxztB6MxImF6ICk0hZ34tSb6ah7Te5ouozOKbutM17nwgEAfF7UjJLxdBwp4l X-Received: by 10.28.91.1 with SMTP id p1mr14206426wmb.102.1488217259766; Mon, 27 Feb 2017 09:40:59 -0800 (PST) Received: from nico-work.localnet (ipservice-092-219-046-010.092.219.pools.vodafone-ip.de. [92.219.46.10]) by smtp.gmail.com with ESMTPSA id y69sm15664408wrc.69.2017.02.27.09.40.58 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 27 Feb 2017 09:40:58 -0800 (PST) From: Nico Kruber To: Evgeny Kincharov Cc: "user@flink.apache.org" Subject: Re: Running streaming job on every node of cluster Date: Mon, 27 Feb 2017 18:40:48 +0100 Message-ID: <2854760.on0MsTH7LR@nico-work> Organization: data Artisans In-Reply-To: References: MIME-Version: 1.0 Content-Type: multipart/signed; boundary="nextPart2817035.7KOZWuXnbK"; micalg="pgp-sha1"; protocol="application/pgp-signature" archived-at: Mon, 27 Feb 2017 17:41:09 -0000 --nextPart2817035.7KOZWuXnbK Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset="UTF-8" What about setting the parallelism[1] to the total number of slots in your= =20 cluster? By default, all parts of your program are put into the same slot sharing=20 group[2] and by setting the parallelism you would have this slot (with your= =20 whole program) in each parallel slot as well (minus/plus operators that hav= e=20 lower/higher parallelism), if I understand it correctly. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ parallel.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ datastream_api.html#task-chaining-and-resource-groups On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote: > Thanks for your answer. > The problem is that both slots are seized in the one node. Of course if t= his > node has enough free slots. Another nodes idle. I want to utilize cluster > resource little bit more. May be the other deployment modes allow it. >=20 > BR, Evgeny. >=20 > =D0=9E=D1=82: Nico Kruber > =D0=9E=D1=82=D0=BF=D1=80=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=BE: 27 =D1=84= =D0=B5=D0=B2=D1=80=D0=B0=D0=BB=D1=8F 2017 =D0=B3. =D0=B2 20:07 > =D0=9A=D0=BE=D0=BC=D1=83: user@flink.apache.org > =D0=9A=D0=BE=D0=BF=D0=B8=D1=8F: Evgeny Kincharov > =D0=A2=D0=B5=D0=BC=D0=B0: Re: Running streaming job on every node of clus= ter >=20 > Hi Evgeny, > I tried to reproduce your example with the following code, having another > console listening with "nc -l 12345" >=20 > env.setParallelism(2); > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) > .map(new MapFunction() { > @Override > public String map(final > String s) throws Exception { return s; } }) > .addSink(new DiscardingSink()); >=20 > This way, I do get a source with parallelism 1 and map & sink with > parallelism 2 and the whole program accompanying 2 slots as expected. You > can check in the web interface of your cluster how many slots are taken > after executing one instance of your program. >=20 > How do you set your parallelism? >=20 >=20 > Nico >=20 > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: >=20 > > Hi, > > > > > > > > I have the simplest streaming job, and I want to distribute my job on > > every node of my Flink cluster. > > > > > > > > Job is simple: > > > > > > > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > > > > > > > When I increase parallelism of my job when deploying or directly in cod= e, > > no effect because source is can't work in parallel. Now I reduce "Tasks > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in t= he > > cluster. It works when I have only one job. If I want deploy another in > > parallel there is no free slots. I hope more convenient way to do that = is > > exists. Thanks. > > > > > > > > BR, > > Evgeny >=20 >=20 >=20 --nextPart2817035.7KOZWuXnbK Content-Type: application/pgp-signature; name="signature.asc" Content-Description: This is a digitally signed message part. Content-Transfer-Encoding: 7Bit -----BEGIN PGP SIGNATURE----- iF0EABECAB0WIQTIh4KsbsNd3l7wd+cg8nJL2uqeWQUCWLRkoAAKCRAg8nJL2uqe WQv6AKD/wlDgfXKsL4ik0WYl/H6AK+PGywCgwRL6vszY2sh/+QlAW6Rk4QfXRhE= =y5Ug -----END PGP SIGNATURE----- --nextPart2817035.7KOZWuXnbK--