Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 16E54182C4 for ; Thu, 21 Jan 2016 22:51:17 +0000 (UTC) Received: (qmail 79575 invoked by uid 500); 21 Jan 2016 22:51:15 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 79463 invoked by uid 500); 21 Jan 2016 22:51:15 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 79381 invoked by uid 99); 21 Jan 2016 22:51:15 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jan 2016 22:51:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 87AC8C17FF for ; Thu, 21 Jan 2016 22:51:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.981 X-Spam-Level: ** X-Spam-Status: No, score=2.981 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=braintreepayments-com.20150623.gappssmtp.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id FDuR-FfiEDmW for ; Thu, 21 Jan 2016 22:51:05 +0000 (UTC) Received: from mail-lf0-f46.google.com (mail-lf0-f46.google.com [209.85.215.46]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 3A2B320CC7 for ; Thu, 21 Jan 2016 22:51:05 +0000 (UTC) Received: by mail-lf0-f46.google.com with SMTP id 17so36335431lfz.1 for ; Thu, 21 Jan 2016 14:51:05 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=braintreepayments-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=sn/WXMWJZN+6S/7ss9HC3PaTFb9UvfFzUCG2Bv4hRwE=; b=YvWWqoC1UF0Jn7M4eHeHEKnSy5rMfJikJUg3S4CD8u9jZLpJulWiBHzZN4R3I2/G04 wg94n/KbGWwTMm6prsPSOf71Q2li/4qF25z23r0UqS5HHMfM11OhqAYtfkaFE06lrhZO nXF1giWiKOjOG6NyAc49ej9NXDGG43ATXozJTmvTQWMuje8LmfabzqZEmzIFGYZRtnl0 Jp6uTdgJZhSjSGcb7ca+wrZLJwWV+0cLdfugxJ5u+EigEoJIbk3A3UDthZWndv0ZXSTz uJzBmsop6QrJhfcm0lzi7ubX2OF5ssewg56vIln6oexqU+RJ5ASCVioV5qy2QJekZ/EW kgPQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=sn/WXMWJZN+6S/7ss9HC3PaTFb9UvfFzUCG2Bv4hRwE=; b=R4i1xjukVnX4j8uza3wo3rN0xXLK1YN1mJcl6O/O2G9P1J0HHKnEJ4UmhcyyFxGRRn 5NSoIGnbAQI6KeVcrztreo4gZnBkGGuvyzSHiedthGQgl8UbWUCOeYViP5kDPo6S2b3g 1epspD6P0UMdim8FwKyohK09VJsXu0AhE3qtD6dX41IUlP78hz9f1o3spT5UvKd30KCp l6XSATb0fRtgqzc2nTlOHTEmSpTEHRt+c4ldXtdZFACypgNFhYJed/lWqfakYckpOh1A /1pGAlKoLWCiAwcO34e9MWjyrJabJMdIlwovSAqR7J/v5K8xfqLiPObCDFdBw0YzX1De k5zA== X-Gm-Message-State: ALoCoQn4Y52QG8jpEYCa04rR5rsZ/ERkTUeMlgaRYUkNZ0PO+aif0d3enBXDZXyOSgkT8XS2lFbWXJdC9EUE15WtUTnWlFIR1WQyZlB/FeFgklxFt03R+YQ= MIME-Version: 1.0 X-Received: by 10.25.0.84 with SMTP id 81mr13885535lfa.132.1453416663636; Thu, 21 Jan 2016 14:51:03 -0800 (PST) Received: by 10.25.37.11 with HTTP; Thu, 21 Jan 2016 14:51:03 -0800 (PST) In-Reply-To: References: <89CE1FB5-241D-4605-A83B-F9774A32C878@gmail.com> Date: Thu, 21 Jan 2016 16:51:03 -0600 Message-ID: Subject: Re: DeserializationSchema isEndOfStream usage? From: David Kim To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113aa2aef27c830529dfef9e --001a113aa2aef27c830529dfef9e Content-Type: text/plain; charset=UTF-8 Hi Robert! Thanks for reaching out. I ran into an issue and wasn't sure if this was due to a misconfiguration on my end of if this is a real bug. I have one DataStream and I'm sinking to two different kafka sinks. When the job starts, I run into this error: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.UnsupportedOperationException: The accumulator 'producer-record-retry-rate' already exists and cannot be added. at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) at java.lang.Thread.run(Thread.java:745) The particular accumulator the exception is complains about changes, meaning it's not always 'producer-record-retry-rate' -- most likely due to the non-deterministic ordering of the collection. Any guidance appreciated! I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08. The flink code looks something like this: val stream: DataStream[Foo] = ... val kafkaA = new FlinkKafkaProducer08[Foo]... val kafkaB = new FlinkKafkaProducer08[Foo]... stream .addSink(kafkaA) stream. .addSink(kafkaB) Thanks, David On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger wrote: > I've now merged the pull request. DeserializationSchema.isEndOfStream() > should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors. > > Please let me know if the updated code has any issues. I'll fix the issues > asap. > > On Wed, Jan 13, 2016 at 5:06 PM, David Kim < > david.kim@braintreepayments.com> wrote: > >> Thanks Robert! I'll be keeping tabs on the PR. >> >> Cheers, >> David >> >> On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger >> wrote: >> >>> Hi David, >>> >>> In theory isEndOfStream() is absolutely the right way to go for stopping >>> data sources in Flink. >>> That its not working as expected is a bug. I have a pending pull request >>> for adding a Kafka 0.9 connector, which fixes this issue as well (for all >>> supported Kafka versions). >>> >>> Sorry for the inconvenience. If you want, you can check out the branch >>> of the PR and build Flink yourself to get the fix. >>> I hope that I can merge the connector to master this week, then, the fix >>> will be available in 1.0-SNAPSHOT as well. >>> >>> Regards, >>> Robert >>> >>> >>> >>> Sent from my iPhone >>> >>> On 11.01.2016, at 21:39, David Kim >>> wrote: >>> >>> Hello all, >>> >>> I saw that DeserializationSchema has an API "isEndOfStream()". >>> >>> >>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java >>> >>> Can *isEndOfStream* be utilized to somehow terminate a streaming flink >>> job? >>> >>> I was under the impression that if we return "true" we can control when >>> a stream can close. The use case I had in mind was controlling when >>> unit/integration tests would terminate a flink job. We can rely on the fact >>> that a test/spec would know how many items it expects to consume and then >>> switch *isEndOfStream* to return true. >>> >>> Am I misunderstanding the intention for *isEndOfStream*? >>> >>> I also set a breakpoint on *isEndOfStream* and saw that it never was >>> hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema >>> implementation. >>> >>> Currently testing on 1.0-SNAPSHOT. >>> >>> Cheers! >>> David >>> >>> >> >> >> -- >> Note: this information is confidential. It is prohibited to share, post >> online or otherwise publicize without Braintree's prior written consent. >> > > -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent. --001a113aa2aef27c830529dfef9e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Robert!

Thanks for reaching out. I r= an into an issue and wasn't sure if this was due to a misconfiguration = on my end of if this is a real bug. I have one DataStream and I'm sinki= ng to two different kafka sinks. When the job starts, I run into this error= :

org.apa= che.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfu= n$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)=
at org.apache.flink.runtime.jobmanager.JobManager$$= anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)<= /font>
at org.apache.flink.runtime.jobmanager.JobManager$$a= nonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
at scala.concurrent.impl.Future$PromiseCompletingRunn= able.liftedTree1$1(Future.scala:24)
at scala.concurr= ent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
=
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)=
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaF= orkJoinTask.exec(AbstractDispatcher.scala:401)
at s= cala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)<= /div>
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask= (ForkJoinPool.java:1339)
at scala.concurrent.forkjoi= n.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav= a:107)
Caused by: java.lang.UnsupportedOperationException: The accumulator '= ;producer-record-retry-rate' already exists and cannot be added.=
at org.apache.flink.api.common.functions.util.AbstractRunt= imeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProdu= cerBase.open(FlinkKafkaProducerBase.java:204)
at o= rg.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Functi= onUtils.java:36)
at org.apache.flink.streaming.api.o= perators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)<= /font>
at org.apache.flink.streaming.runtime.tasks.StreamTa= sk.openAllOperators(StreamTask.java:305)
at org.apac= he.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
at org.apache.flink.runtime.taskmanager.Task.run(Task.j= ava:567)
at java.lang.Thread.run(Thread.java:745)<= /font>


The particular accumul= ator the exception is complains about changes, meaning it's not always = 'producer-record-retry-rate' -- most likely due to the non-determin= istic ordering of the collection. Any guidance appreciated!

<= /div>
I'm using 1.0-SNAPSHOT and my two sinks are=C2=A0FlinkKafkaPr= oducer08.

The flink code looks something like this= :


val stre= am: DataStream[Foo] =3D ...
val kafkaA =3D new FlinkKafkaProducer08[Foo]...
= val kafkaB =3D new FlinkKafkaProducer08= [Foo]...

=
stream
=C2=A0 .addSink(kafkaA)

stream.
=C2=A0 .addSink(kafkaB)

Tha= nks,
David

On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <rmetz= ger@apache.org> wrote:
I've now merged the pull request.=C2=A0DeserializationSchema.isEndOfStream() should now be evaluated= correctly by the Kafka 0.9 and 0.8 connectors.

Ple= ase let me know if the updated code has any issues. I'll fix the issues= asap.

On Wed, Jan 13, 2016 at 5:0= 6 PM, David Kim <david.kim@braintreepayments.com> wrote:
Thanks Robert= ! I'll be keeping tabs on the PR.

Cheers,
= David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <metrobert@gma= il.com> wrote:
Hi David,

In theory isEndOfStream() is = absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request = for adding a Kafka 0.9 connector, which fixes this issue as well (for all s= upported Kafka versions).

Sorry for the inconvenie= nce. If you want, you can check out the branch of the PR and build Flink yo= urself to get the fix.
I hope that I can merge the connector to m= aster this week, then, the fix will be available in 1.0-SNAPSHOT as well.

Regards,
Robert

=

Sent from my iPhone

On 11.01.2016, at 21:39= , David Kim <david.kim@braintreepayments.com> wrote:

Can isEndOfStream be utilized to somehow terminate = a streaming flink job?

I was under the impression = that if we return "true" we can control when a stream can close. = The use case I had in mind was controlling when unit/integration tests woul= d terminate a flink job. We can rely on the fact that a test/spec would kno= w how many items it expects to consume and then switch isEndOfStream= to return true.

Am I misunderstanding the intenti= on for isEndOfStream?=C2=A0

I also set = a breakpoint on isEndOfStream and saw that it never was hit when usi= ng "FlinkKafkaConsumer082" to pass in a DeserializationSchema imp= lementation.

Currently testing on 1.0-SNAPSHOT.

Cheers!
David



--
Note= : this information is confidential. It is prohibited to share, post online = or otherwise publicize without Braintree's prior written consent.




--
=
Note: this information is confidential. It is= prohibited to share, post online or otherwise publicize without Braintree&= #39;s prior written consent.
--001a113aa2aef27c830529dfef9e--