Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 43A44200D0B for ; Wed, 27 Sep 2017 18:32:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 422EE1609CA; Wed, 27 Sep 2017 16:32:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 37F751609C1 for ; Wed, 27 Sep 2017 18:32:30 +0200 (CEST) Received: (qmail 70244 invoked by uid 500); 27 Sep 2017 16:32:29 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 70234 invoked by uid 99); 27 Sep 2017 16:32:29 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2017 16:32:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9D21F1A47F2 for ; Wed, 27 Sep 2017 16:32:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id bBntmdBzVDSD for ; Wed, 27 Sep 2017 16:32:25 +0000 (UTC) Received: from mail-wm0-f42.google.com (mail-wm0-f42.google.com [74.125.82.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id E60735FD33 for ; Wed, 27 Sep 2017 16:32:24 +0000 (UTC) Received: by mail-wm0-f42.google.com with SMTP id q124so20391724wmb.0 for ; Wed, 27 Sep 2017 09:32:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=WQmoq7iNtLAZXPKdE3ggDYRgI+X7rs05osnTLtxxX1w=; b=ot/sEPPDOsQwPd6zFmgMcvmSn6t7BOGJxS3Wa8qrM8a9oyyb2PSkn7QXMeYhNIGDXk i+RSL7s8n7D6txJow7JiKIjbn5GDDk3YXs/uo88WNMQoHxkW1ur1NQ9I9NcH85RfBgMA j7f0h3Zei8a3+yezg+T1rUxMoKxcbbwGW4JxBJSHq+dEhrMW2ypEBLaHT7G/5i5yJWOf ftyAtrdeS0Iyao8Zt6WrdGW93tVSgNzMY0Fi7WWplmzxLECg71x3J0wL3zIKHLUb+Dji fJJqobe8u3lQW9Ond9e3q6vfgL+md/DeNVSGwNTtQVkm4NJl69/A1OWlCYTL1wBWAqx7 NQzg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=WQmoq7iNtLAZXPKdE3ggDYRgI+X7rs05osnTLtxxX1w=; b=FhMUyS3ZeZ4bRtGWAC+TU22amgs+W/n9DJ+IQlakOgG3nTXKBPjlmpZbCfrrXcuNpC tNXzWC2iaHCrvRjWKA1oMevYo5a5RMN+Fn/tuCR9Ee66KynSoTC3JCgh6vEoR+M/U0/I mz4iEHOtEp+yC+//lkpfA/g75Q3KUOESGXrkUUlFuSfgw0K7I67qIe+IhNxmR2noLV+I b37LopjapRHqiuLvYGqzll4lnIJUavXPaWCJuaRoTxCAxvXC27U4VFP8j0abDJ4ixqCq HmyX1zkdIPV6Obmnzsb14fUeeS/3oVAlcqaXx3s2rv/4VH8s2JPK7bLLsxNt57kD8+/G /VSQ== X-Gm-Message-State: AHPjjUiQGQbgJbjIO02sGkQKFzSNhrs+RqndcAkiXruJVkkU3wWLnnYA SCSYo27vHfYNlwDOGQNqUBu5NkOJ X-Google-Smtp-Source: AOwi7QBrYdIAvHS89KMEfmlxYx9mEb33SKQJG9Ef/4AiS2TjjAfAuPA7uCyQu78R7xxYkQzqIVY2tg== X-Received: by 10.28.107.17 with SMTP id g17mr916032wmc.58.1506529943657; Wed, 27 Sep 2017 09:32:23 -0700 (PDT) Received: from [192.168.2.154] (p5B22FA64.dip0.t-ipconnect.de. [91.34.250.100]) by smtp.gmail.com with ESMTPSA id 204sm6860358wms.1.2017.09.27.09.32.22 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Wed, 27 Sep 2017 09:32:23 -0700 (PDT) From: Yunus Olgun Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_7A228A22-DDB1-4367-8091-E41BB4AB4527" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: CustomPartitioner that simulates ForwardPartitioner and watermarks Date: Wed, 27 Sep 2017 18:32:20 +0200 In-Reply-To: <76BB12F2-BF1F-4735-BC19-9D40D89D1BAC@data-artisans.com> Cc: user@flink.apache.org To: Kostas Kloudas References: <67AFFE27-D7C8-45E7-9B54-9EAB0EE287A1@gmail.com> <76BB12F2-BF1F-4735-BC19-9D40D89D1BAC@data-artisans.com> X-Mailer: Apple Mail (2.3273) archived-at: Wed, 27 Sep 2017 16:32:31 -0000 --Apple-Mail=_7A228A22-DDB1-4367-8091-E41BB4AB4527 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Kostas, Yes, you have summarized well. I want to only forward the data to the = next local operator, but broadcast the watermark through the cluster. - I can=E2=80=99t set parallelism of taskB to 1. The stream is too big = for that. Also, the data is ordered at each partition. I don=E2=80=99t = want to change that order. - I don=E2=80=99t need KeyedStream. Also taskA and taskB will always = have the same parallelism with each other. But this parallelism can be = increased in the future. The use case is: The source is Kafka. At our peak hours or when we want = to run the streaming job with old data from Kafka, always the same thing = happens. Even at trivial jobs. Some consumers consumes faster than = others. They produce too much data to downstream but watermark advances = slowly at the speed of the slowest consumer. This extra data gets piled = up at downstream operators. When the downstream operator is an = aggregation, it is ok. But when it is a in-Flink join; state size gets = too big, checkpoints take much longer and overall the job becomes slower = or fails. Also it effects other jobs at the cluster. So, basically I want to implement a throttler. It compares timestamp of = a record and the global watermark. If the difference is larger than a = constant threshold it starts sleeping 1 ms for each incoming record. = This way, fast operators wait for the slowest one. The only problem is that, this solution came at the cost of one network = shuffle and data serialization/deserialization. Since the stream is = large I want to avoid the network shuffle at the least.=20 I thought operator instances within a taskmanager would get the same = indexId, but apparently this is not the case. Thanks, > On 27. Sep 2017, at 17:16, Kostas Kloudas = wrote: >=20 > Hi Yunus, >=20 > I am not sure if I understand correctly the question. >=20 > Am I correct to assume that you want the following? >=20 > =E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80= =94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94> time >=20 > ProcessA = ProcessB >=20 > Task1: W(3) E(1) E(2) E(5) W(3) W(7) E(1) E(2) E(5) >=20 > Task2: W(7) E(3) E(10) E(6) W(3) W(7) E(3) E(10) = E(6) >=20 >=20 > In the above, elements flow from left to right and W() stands for = watermark and E() stands for element. > In other words, between Process(TaksA) and Process(TaskB) you want to = only forward the elements, but broadcast the watermarks, right? >=20 > If this is the case, a trivial solution would be to set the = parallelism of TaskB to 1, so that all elements go through the same = node. >=20 > One other solution is what you did, BUT by using a custom partitioner = you cannot use keyed state in your process function B because the=20 > stream is no longer keyed. >=20 > A similar approach to what you did but without the limitation above, = is that in the first processFunction (TaskA) you can append the=20 > taskId to the elements themselves and then do a keyBy(taskId) between = the first and the second process function. >=20 > These are the solutions that I can come up with, assuming that you = want to do what I described. >=20 > But in general, could you please describe a bit more what is your use = case?=20 > This way we may figure out another approach to achieve your goal.=20 > In fact, I am not sure if you earn anything by broadcasting the = watermark, other than=20 > re-implementing (to some extent) Flink=E2=80=99s windowing mechanism. >=20 > Thanks, > Kostas >=20 >> On Sep 27, 2017, at 4:35 PM, Yunus Olgun > wrote: >>=20 >> Hi, >>=20 >> I have a simple streaming job such as: >>=20 >> source.process(taskA) >> .process(taskB) >>=20 >> I want taskB to access minimum watermark of all parallel taskA = instances, but the data is ordered and should not be shuffled. = ForwardPartitioner uses watermark of only one predecessor. So, I have = used a customPartitioner. >>=20 >> source.process(taskA) >> .map(AssignPartitionID) >> .partitionCustom(IdPartitioner) >> .map(StripPartitionID) >> .process(taskB) >>=20 >> At AssignPartitionID function, I attach = getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the = object. At IdPartitioner, I return this partitionId. >>=20 >> This solved the main requirement but I have another concern now, >>=20 >> Network shuffle: I don=E2=80=99t need a network shuffle. I thought = within a taskmanager, indexId of taskA subtasks would be same as indexId = of taskB subtasks. Unfortunately, they are not. Is there a way to make = partitionCustom distribute data like ForwardPartitioner, to the next = local operator?=20 >>=20 >> As I know, this still requires object serialization/deserialization = since operators can=E2=80=99t be chained anymore. Is there a way to get = minimum watermark from upstream operators without network shuffle and = object serilization/deserialization? >>=20 >> Regards, >=20 --Apple-Mail=_7A228A22-DDB1-4367-8091-E41BB4AB4527 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Kostas,

Yes, you have summarized well. I want to only forward the = data to the next local operator, but broadcast the watermark through the = cluster.

- I = can=E2=80=99t set parallelism of taskB to 1. The stream is too big for = that. Also, the data is ordered at each partition. I don=E2=80=99t want = to change that order.

- I don=E2=80=99t need KeyedStream. Also taskA and taskB will = always have the same parallelism with each other. But this parallelism = can be increased in the future.

The use case is: The source is Kafka. = At our peak hours or when we want to run the streaming job with old data = from Kafka, always the same thing happens. Even at trivial jobs. Some = consumers consumes faster than others. They produce too much data to = downstream but watermark advances slowly at the speed of the slowest = consumer. This extra data gets piled up at downstream operators. When = the downstream operator is an aggregation, it is ok. But when it is a = in-Flink join; state size gets too big, checkpoints take much longer and = overall the job becomes slower or fails. Also it effects other jobs at = the cluster.

So,= basically I want to implement a throttler. It compares timestamp of a = record and the global watermark. If the difference is larger than a = constant threshold it starts sleeping 1 ms for each incoming record. = This way, fast operators wait for the slowest one.

The only problem is = that, this solution came at the cost of one network shuffle and data = serialization/deserialization. Since the stream is large I want to avoid = the network shuffle at the least. 

I thought operator instances within a = taskmanager would get the same indexId, but apparently this is not the = case.

Thanks,

On 27. Sep 2017, at 17:16, = Kostas Kloudas <k.kloudas@data-artisans.com> wrote:

Hi Yunus,

I am not sure if I = understand correctly the question.

Am I correct to assume that you want = the following?

= =E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80= =94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94> time

= ProcessA = ProcessB

Task1: W(3) E(1) E(2) = E(5) = W(3) W(7) E(1) E(2) E(5)

Task2: W(7) E(3) E(10) E(6) = W(3) W(7) E(3) E(10) E(6)


In = the above, elements flow from left to right and W() stands for watermark = and E() stands for element.
In other words, between = Process(TaksA) and Process(TaskB) you want to only forward the elements, but broadcast the = watermarks, right?

If this is the case, a trivial solution would be to set the = parallelism of TaskB to 1, so that all elements go through the same = node.

One = other solution is what you did, BUT by using a custom = partitioner you cannot use keyed state in your process function B = because the 
stream is no longer = keyed.

A = similar approach to what you did but without the limitation above, is = that in the first processFunction (TaskA) you can append = the 
taskId to the elements themselves and = then do a keyBy(taskId) between the first and the second process = function.

These = are the solutions that I can come up with, assuming that you want to do = what I described.

But in general, could you please describe a bit more what is = your use case? 
This way we may figure out = another approach to achieve your goal. 
In = fact, I am not sure if you earn anything by broadcasting the watermark, = other than 
re-implementing (to some extent) = Flink=E2=80=99s windowing mechanism.

Thanks,
Kostas

On Sep = 27, 2017, at 4:35 PM, Yunus Olgun <yunolgun@gmail.com> = wrote:

Hi,

I have a simple streaming = job such as:

source.process(taskA)
=           .process(taskB= )

I want taskB to access minimum watermark = of all parallel taskA instances, but the data is ordered and should not = be shuffled. ForwardPartitioner uses watermark of only one predecessor. = So, I have used a customPartitioner.

source.process(taskA)
=           .map(AssignPar= titionID)
=           .partitionCust= om(IdPartitioner)
=           .map(StripPart= itionID)
=           .process(taskB= )

At AssignPartitionID function, I attach = getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the = object. At IdPartitioner, I return this partitionId.

This solved the main requirement but I have another concern = now,

Network shuffle: I don=E2=80=99t need = a network shuffle. I thought within a taskmanager, indexId of taskA = subtasks would be same as indexId of taskB subtasks. Unfortunately, they = are not. Is there a way to make partitionCustom distribute data like = ForwardPartitioner, to the next local operator?

As I know, this still requires object = serialization/deserialization since operators can=E2=80=99t be chained = anymore. Is there a way to get minimum watermark from upstream operators = without network shuffle and object serilization/deserialization?

Regards,


= --Apple-Mail=_7A228A22-DDB1-4367-8091-E41BB4AB4527--