Return-Path: X-Original-To: apmail-beam-user-archive@minotaur.apache.org Delivered-To: apmail-beam-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA40F18B3C for ; Mon, 21 Mar 2016 19:31:35 +0000 (UTC) Received: (qmail 2905 invoked by uid 500); 21 Mar 2016 19:31:35 -0000 Delivered-To: apmail-beam-user-archive@beam.apache.org Received: (qmail 2825 invoked by uid 500); 21 Mar 2016 19:31:35 -0000 Mailing-List: contact user-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.incubator.apache.org Delivered-To: mailing list user@beam.incubator.apache.org Received: (qmail 2814 invoked by uid 99); 21 Mar 2016 19:31:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Mar 2016 19:31:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3EFB218046D for ; Mon, 21 Mar 2016 19:31:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.198 X-Spam-Level: * X-Spam-Status: No, score=1.198 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id dVdJ7BfKxJRG for ; Mon, 21 Mar 2016 19:31:30 +0000 (UTC) Received: from mail-qk0-f175.google.com (mail-qk0-f175.google.com [209.85.220.175]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 97E0D5F22E for ; Mon, 21 Mar 2016 19:31:29 +0000 (UTC) Received: by mail-qk0-f175.google.com with SMTP id p130so29743028qke.1 for ; Mon, 21 Mar 2016 12:31:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=from:message-id:mime-version:subject:date:references:to:in-reply-to; bh=MRWtHzfnoTAxSfhxiwCH5t4Bya6iVT7oOg2lSAQ7xpU=; b=fg+DsI/S2ZrHEIjKhPPOvwnG7MIRlavS2wIOvEB1QDbZuCXpHkBkLWAjDIOl7ACQ1p bpmPH2npiFHqysr6VRfuBQJNx1Kb68zLMireKPCuojm0+nDOwjwLA2aVorjyfUIinZ4l 9BUa9cmwpncyKlsTbnjmj+LW12qzjKPWZXLNHk+CounwDa7us9F6pNld4qiqFgrR0hSC G1yNhbtVgZlJW4bqxme2hz3bCjLWEmQ8P1wfuj417QBCU7eN2v3BogaWX4wWDOEp19xq qmsg1BthTGVpGwxXHnSsS8zKP7stV4S5kg3fTBVwOkhVjwk0UFERDP8K1BDtPbMj3/9h 0m2A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:message-id:mime-version:subject:date :references:to:in-reply-to; bh=MRWtHzfnoTAxSfhxiwCH5t4Bya6iVT7oOg2lSAQ7xpU=; b=KZmXMPhZnUTmJiRkAQkpi2hgxXrnlQDIPHFx6JOB7xlhPBUPb3U3DE+27d0I5SlMjz 3t5/Tl45MKRBfv/vYcpZR+it0p9UZzeHBpF7BPWd08fnXT/4R4llP/DmE6Sndw3ton3+ QvdyqUxpic0du/DAkd6PDZqyy6yK2oZAoOLak23W1TcXagNuJCz1O0jiEuXisMCmgiV3 VAEVq2Wyu62tGHOyyVaSdFmybm4aoiJe+vtfRiF+jmxK4llB+1quTP+f3OxV/LUebfwL z7ilrXm0p1kSe6uaQaedadDqu1jUtmbczgC6lV8EgKF4RZR3i092xu4aNSjdmiUg/LrP vBSg== X-Gm-Message-State: AD7BkJKUtH0YbmYF3DF4d2oSBXz1hHYc/qEBvKAwVvgRTbU1NP6kAXrDoNjix8vbptsmpA== X-Received: by 10.55.79.5 with SMTP id d5mr42789298qkb.30.1458588688850; Mon, 21 Mar 2016 12:31:28 -0700 (PDT) Received: from [172.31.13.227] (gzac10-107-1.nje.twosigma.com. [208.77.214.155]) by smtp.gmail.com with ESMTPSA id y206sm12809435qhc.0.2016.03.21.12.31.27 for (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Mon, 21 Mar 2016 12:31:27 -0700 (PDT) From: William McCarthy Content-Type: multipart/alternative; boundary="Apple-Mail=_9EE23E3E-A351-4863-B357-D107066A7E44" Message-Id: Mime-Version: 1.0 (Mac OS X Mail 9.2 \(3112\)) Subject: Re: Output from Beam (on Flink) to Kafka Date: Mon, 21 Mar 2016 15:31:25 -0400 References: <2DE60E64-94D9-4CC3-AC9A-A9F78B336E6B@gmail.com> <9F0BB9EC-F999-4D64-9085-312C6146DCDB@gmail.com> <9DCECE40-1343-47C3-85C5-AB41AB7B133E@gmail.com> <29E44E91-1A5E-4F78-B1D8-89865AE49FE0@apache.org> <26B1ABBE-13D4-4049-8BE2-21C407AD6F89@gmail.com> <7350E018-DFFA-4963-B78A-D7A47372CB4C@gmail.com> <56EFFCFF.6020704@nanthrax.net> To: user@beam.incubator.apache.org In-Reply-To: X-Mailer: Apple Mail (2.3112) --Apple-Mail=_9EE23E3E-A351-4863-B357-D107066A7E44 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Raghu, Sure, I=E2=80=99ll run this this evening and get back to you. Bill > On Mar 21, 2016, at 12:23 PM, Raghu Angadi wrote: >=20 > Thanks Max. >=20 > Bill McCarthy,=20 > I know you are unblocked and KafkaWriter is good enough. Please try = KafkaIO source from my branch with Flink runner if you get a chance. >=20 > thanks, > Raghu. >=20 > On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofr=C3=A9 = > wrote: > Thanks for the update Max ! >=20 > Regards > JB >=20 >=20 > On 03/21/2016 02:39 PM, Maximilian Michels wrote: > FYI: The Runner registration has been fixed. The Flink runner > explicitly registers as of [1]. Also, the SDK tries to look up the > PipelineRunner class in case it has not been registered [2]. >=20 > [1] https://github.com/apache/incubator-beam/pull/40 = > [2] https://github.com/apache/incubator-beam/pull/61 = >=20 > On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels > wrote: > Great to see such a lively discussion here. >=20 > I think we'll support sinks through the Write interface (like in > batched execution) and also have a dedicated wrapper for the Flink > sinks. This is a very pressing but easy to solve issue of the Flink > runner. Expect it to be in next week. >=20 > Also, the proper registration of the runner is about to to be merged. > We just need an ok from the contributor to merge the changes. >=20 > Best, > Max >=20 > On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin > wrote: > Thanks Bill! >=20 > Filed https://issues.apache.org/jira/browse/BEAM-136 = , but I'm glad it's not > blocking you! >=20 > On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy > > wrote: >=20 > I tried that, but still no dice: Just to be clear, it=E2=80=99s not a = blocker for > me, given that I have my example running, but for your information the > exception is below. >=20 > I=E2=80=99ll watch the commit log on the beam incubator and look = forward to > deleting my copy of Raghu=E2=80=99s contributions when they=E2=80=99re = merger to master. >=20 > Thanks again for everyone=E2=80=99s help, >=20 > Bill >=20 >=20 > Command followed by exception: >=20 > $ flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar > --runner=3Dorg.apache.beam.runners.flink.FlinkPipelineRunner > --bootstrapServers=3Dcl-pu4p:9092 --topics=3Dtest_in = --outputTopic=3Dtest_out >=20 > ------------------------------------------------------------ > The program finished with the following exception: >=20 > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:520) > at > = org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > = org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.jav= a:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > = org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)= > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalArgumentException: Unknown 'runner' = specified > 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported = pipeline > runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > DirectPipelineRunner] > at > = com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditi= ons.checkArgument(Preconditions.java:146) > at > = com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(= PipelineOptionsFactory.java:1445) > at > = com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(Pi= pelineOptionsFactory.java:99) > at > = com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(Pi= pelineOptionsFactory.java:284) > at > = com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(T= opHashtagsExample.java:117) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > = sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62) > at > = sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:505) > ... 6 more >=20 > On Mar 18, 2016, at 5:35 PM, Thomas Groh > wrote: >=20 > I don't believe the FlinkPipelineRunner is registered the same way the > Dataflow & Direct Pipeline runners are registered; using > org.apache.beam.runners.flink.FlinkPipelineRunner should work >=20 > On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy > > wrote: >=20 > Thanks Dan, >=20 > I tried that, but getting the below. Note that the jar contains the > FlinkPipelineRunner. >=20 >=20 >=20 > % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline > org/apache/beam/runners/flink/FlinkPipelineRunner.class > org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class > org/apache/beam/runners/flink/FlinkPipelineOptions.class > = org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >=20 > % flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar --runner=3DFlinkPipelineRunner > --bootstrapServers=3Dcl-pu4p:9092 --topics=3Dtest_in = --outputTopic=3Dtest_out >=20 > ------------------------------------------------------------ > The program finished with the following exception: >=20 > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:520) > at > = org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > = org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.jav= a:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > = org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)= > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalArgumentException: Unknown 'runner' = specified > 'FlinkPipelineRunner', supported pipeline runners > [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > DirectPipelineRunner] > at > = com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditi= ons.checkArgument(Preconditions.java:146) > at > = com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(= PipelineOptionsFactory.java:1445) > at > = com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(Pi= pelineOptionsFactory.java:99) > at > = com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(Pi= pelineOptionsFactory.java:284) > at > = com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(T= opHashtagsExample.java:117) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > = sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62) > at > = sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:505) > ... 6 more >=20 >=20 >=20 > On Mar 18, 2016, at 5:00 PM, Dan Halperin > wrote: >=20 > Thanks for catching that, Aljoscha! >=20 > Note that the Flink runner should be available via a command-line = option > as well: --runner=3DFlinkPipelineRunner. >=20 > The list of valid values for that flag is computed by walking the > classpath at runtime, so as long as the Flink jar is present it'll = work. >=20 > On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek > > wrote: >=20 > Hi, > looks like the example is being executed with the DirectPipelineRunner > which does not seem to be able to cope with UnboundedSource. You need = to set > the runner to the FlinkRunner in the example code as described here: > = https://github.com/apache/incubator-beam/tree/master/runners/flink#executi= ng-an-example = >=20 > The Flink runner should be able to deal with UnboundedSource but has = the > limitation that sources are always parallelism=3D1 (this is being = worked on, > however). >=20 > Cheers, > Aljoscha > On 18 Mar 2016, at 20:56, Dan Halperin > wrote: >=20 > Looks like the Flink runner may not yet support arbitrary code written > with the UnboundedSource API. That is, it looks like the Flink runner > expects the sources to get translated away. >=20 > Max? >=20 > Dan >=20 > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy > > wrote: > Thanks Raghu, >=20 > When I try to run it on flink using the incubator-beam code, i.e. >=20 > flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=3Dcl-pu4p:9092 > --topics=3Dtest_in --outputTopic=3Dtest_out >=20 > I get this: >=20 > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:520) > at > = org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > = org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.jav= a:866) > at = org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > = org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)= > at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalStateException: no evaluator registered > for Read(UnboundedKafkaSource) > at > = com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visit= Transform(DirectPipelineRunner.java:852) > at > = com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTre= eNode.java:219) > at > = com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTre= eNode.java:215) > at > = com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTre= eNode.java:215) > at > = com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHi= erarchy.java:102) > at > = com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java= :259) > at > = com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(D= irectPipelineRunner.java:814) > at > = com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipel= ineRunner.java:526) > at > = com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipel= ineRunner.java:96) > at = com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) > at > = com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(T= opHashtagsExample.java:140) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > = sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62) > at > = sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:505) > ... 6 more >=20 > Any ideas? >=20 > Bill >=20 > On Mar 18, 2016, at 2:47 PM, Raghu Angadi > wrote: >=20 > Thanks for trying it. >=20 > I fixed the CheckStyle error (not sure why my build is not failing). > Let me know if you see any issues running with Beam. I haven't tried = it. I > should. In fact Daniel Halperin says my patch should be against Beam.. >=20 > Raghu. >=20 > On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy > > wrote: > Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for > pointing me to working code. >=20 > I=E2=80=99m in the middle of a hack day at the moment, so the speed of = your > responses has been very welcome. >=20 > In the first instance, I=E2=80=99ll try using your changes, Raghu. = I=E2=80=99ve > cloned your repo, switched to the kafka branch and built both = contrib/kafka > and contrib/examples/kafka. The contrib/kafka initially failed with a > CheckStyle error > = (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cl= oud/dataflow/contrib/kafka/KafkaIO.java:683:12: > 'private' modifier out of order with the JLS suggestions)=E2=80=A6 = I=E2=80=99ve fixed that > in my local clone and now it=E2=80=99s building fine. I hope to be = able to run your > contrib unchanged on top of the incubator-beam codebase, which will be = what > I attempt to do now. >=20 > Thanks again to all, for your swift help. >=20 > Bill >=20 > On Mar 18, 2016, at 12:55 PM, Raghu Angadi > > wrote: >=20 > Hi Bill, >=20 > We have fairly well tested patch for KafkaIO (pr #121). It will be > merged soon. The example there keeps track of top hashtags in 10 = minute > sliding window and writes the results to another Kafka topic. Please = try it > if you can. It is well tested on Google Cloud Dataflow. I have not run = it > using Flink runner. >=20 > Raghu. >=20 > On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas > > = wrote: > Hello Bill, >=20 > This is a known limitation of the Flink Runner. > There is a JIRA issue for that > https://issues.apache.org/jira/browse/BEAM-127 = >=20 > A wrapper for Flink sinks will come soon and as Beam evolves, > a more Beam-y solution will come as well. >=20 > Kostas > On Mar 18, 2016, at 5:23 PM, William McCarthy > > wrote: >=20 > Hi, >=20 > I=E2=80=99m trying to write a proof-of-concept which takes messages = from > Kafka, transforms them using Beam on Flink, then pushes the results = onto a > different Kafka topic. >=20 > I=E2=80=99ve used the KafkaWindowedWordCountExample as a starting = point, > and that=E2=80=99s doing the first part of what I want to do, but it = outputs to text > files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I = can=E2=80=99t > figure out how to plug it into the pipeline. I was thinking that it = would be > wrapped with an UnboundedFlinkSink, or some such, but that doesn=E2=80=99= t seem to > exist. >=20 > Any advice or thoughts on what I=E2=80=99m trying to do? >=20 > I=E2=80=99m running the latest incubator-beam (as of last night from > Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google > Compute Engine (Debian Jessie). >=20 > Thanks, >=20 > Bill McCarthy >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 > --=20 > Jean-Baptiste Onofr=C3=A9 > jbonofre@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >=20 --Apple-Mail=_9EE23E3E-A351-4863-B357-D107066A7E44 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Raghu,

Sure, I=E2=80=99ll run this this evening and get back to = you.

Bill

On Mar 21, 2016, at 12:23 PM, Raghu Angadi = <rangadi@google.com> wrote:

Thanks Max.

Bill McCarthy, 
I know you are = unblocked and KafkaWriter is good enough. Please try KafkaIO source from = my branch with Flink runner if you get a chance.

thanks,
Raghu.

On Mon, Mar 21, 2016 at 6:54 AM, = Jean-Baptiste Onofr=C3=A9 <jb@nanthrax.net> wrote:
Thanks for the update Max !

Regards
JB


On 03/21/2016 02:39 PM, Maximilian Michels wrote:
FYI: The Runner registration has been fixed. The Flink runner
explicitly registers as of [1]. Also, the SDK tries to look up the
PipelineRunner class in case it has not been registered [2].

[1] https://github.com/apache/incubator-beam/pull/40
[2] https://github.com/apache/incubator-beam/pull/61

On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <mxm@apache.org> wrote:
Great to see such a lively discussion here.

I think we'll support sinks through the Write interface (like in
batched execution) and also have a dedicated wrapper for the Flink
sinks. This is a very pressing but easy to solve issue of the Flink
runner. Expect it to be in next week.

Also, the proper registration of the runner is about to to be merged.
We just need an ok from the contributor to merge the changes.

Best,
Max

On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <dhalperi@google.com> wrote:
Thanks Bill!

Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm = glad it's not
blocking you!

On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
<williammccarthy@gmail.com> wrote:

I tried that, but still no dice: Just to be clear, it=E2=80=99s not a = blocker for
me, given that I have my example running, but for your information = the
exception is below.

I=E2=80=99ll watch the commit log on the beam incubator and look forward = to
deleting my copy of Raghu=E2=80=99s contributions when they=E2=80=99re = merger to master.

Thanks again for everyone=E2=80=99s help,

Bill


Command followed by exception:

$ flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar
--runner=3Dorg.apache.beam.runners.flink.FlinkPipelineRunner
--bootstrapServers=3Dcl-pu4p:9092 --topics=3Dtest_in = --outputTopic=3Dtest_out

------------------------------------------------------------
  The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
at
= org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:520)
at
= org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:403)
at = org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
= org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.jav= a:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
= org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)=
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalArgumentException: Unknown 'runner' = specified
'org.apache.beam.runners.flink.FlinkPipelineRunner', supported = pipeline
runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
DirectPipelineRunner]
at
= com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditi= ons.checkArgument(Preconditions.java:146)
at
= com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(= PipelineOptionsFactory.java:1445)
at
= com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(Pi= pelineOptionsFactory.java:99)
at
= com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(Pi= pelineOptionsFactory.java:284)
at
= com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(T= opHashtagsExample.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
= sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62)
at
= sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
= org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:505)
... 6 more

On Mar 18, 2016, at 5:35 PM, Thomas Groh <tgroh@google.com> wrote:

I don't believe the FlinkPipelineRunner is registered the same way = the
Dataflow & Direct Pipeline runners are registered; using
org.apache.beam.runners.flink.FlinkPipelineRunner should work

On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
<williammccarthy@gmail.com> wrote:

Thanks Dan,

I tried that, but getting the below. Note that the jar contains the
FlinkPipelineRunner.



% jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
= org/apache/beam/runners/flink/FlinkPipelineRunner.class
org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
org/apache/beam/runners/flink/FlinkPipelineOptions.class
= org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
% flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar --runner=3DFlinkPipelineRunner
= --bootstrapServers=3Dcl-pu4p:9092 --topics=3Dtest_in = --outputTopic=3Dtest_out

------------------------------------------------------------
  The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
at
= org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:520)
at
= org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:403)
at = org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
= org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.jav= a:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
= org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)=
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalArgumentException: Unknown 'runner' = specified
'FlinkPipelineRunner', supported pipeline runners
[BlockingDataflowPipelineRunner, DataflowPipelineRunner,
DirectPipelineRunner]
at
= com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditi= ons.checkArgument(Preconditions.java:146)
at
= com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(= PipelineOptionsFactory.java:1445)
at
= com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(Pi= pelineOptionsFactory.java:99)
at
= com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(Pi= pelineOptionsFactory.java:284)
at
= com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(T= opHashtagsExample.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
= sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62)
at
= sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
= org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:505)
... 6 more



On Mar 18, 2016, at 5:00 PM, Dan Halperin <dhalperi@google.com> wrote:

Thanks for catching that, Aljoscha!

Note that the Flink runner should be available via a command-line = option
as well: --runner=3DFlinkPipelineRunner.

The list of valid values for that flag is computed by walking the
classpath at runtime, so as long as the Flink jar is present it'll = work.

On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

Hi,
looks like the example is being executed with the = DirectPipelineRunner
which does not seem to be able to cope with UnboundedSource. You need to = set
the runner to the FlinkRunner in the example code as described here:
https://github.com/apache/incubator-beam/tree/master/runners/fl= ink#executing-an-example

The Flink runner should be able to deal with UnboundedSource but has = the
limitation that sources are always parallelism=3D1 (this is being worked = on,
however).

Cheers,
Aljoscha
On 18 Mar 2016, at 20:56, Dan Halperin <dhalperi@google.com> wrote:

Looks like the Flink runner may not yet support arbitrary code = written
with the UnboundedSource API. That is, it looks like the Flink runner
expects the sources to get translated away.

Max?

Dan

On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
<williammccarthy@gmail.com> wrote:
Thanks Raghu,

When I try to run it on flink using the incubator-beam code, i.e.

flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar --bootstrapServers=3Dcl-pu4p:9092
--topics=3Dtest_in --outputTopic=3Dtest_out

I get this:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
       at
= org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:520)
       at
= org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:403)
       at
org.apache.flink.client.program.Client.runBlocking(Client.java:248)
       at
= org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.jav= a:866)
       at = org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
       at
= org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)=
       at
org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalStateException: no evaluator registered
for Read(UnboundedKafkaSource)
       at
= com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visit= Transform(DirectPipelineRunner.java:852)
       at
= com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTre= eNode.java:219)
       at
= com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTre= eNode.java:215)
       at
= com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTre= eNode.java:215)
       at
= com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHi= erarchy.java:102)
       at
= com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java= :259)
       at
= com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(D= irectPipelineRunner.java:814)
       at
= com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipel= ineRunner.java:526)
       at
= com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipel= ineRunner.java:96)
       at = com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
       at
= com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(T= opHashtagsExample.java:140)
       at = sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
=        at
= sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62)
       at
= sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43)
       at = java.lang.reflect.Method.invoke(Method.java:498)
       at
= org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:505)
       ... 6 more

Any ideas?

Bill

On Mar 18, 2016, at 2:47 PM, Raghu Angadi <rangadi@google.com> wrote:

Thanks for trying it.

I fixed the CheckStyle error  (not sure why my build is not = failing).
Let me know if you see any issues running with Beam. I haven't tried it. = I
should. In fact Daniel Halperin says my patch should be against = Beam..

Raghu.

On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
<williammccarthy@gmail.com> wrote:
Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu = for
pointing me to working code.

I=E2=80=99m in the middle of a hack day at the moment, so the speed of = your
responses has been very welcome.

In the first instance, I=E2=80=99ll try using your changes, Raghu. = I=E2=80=99ve
cloned your repo, switched to the kafka branch and built both = contrib/kafka
and contrib/examples/kafka. The contrib/kafka initially failed with a
CheckStyle error
= (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cl= oud/dataflow/contrib/kafka/KafkaIO.java:683:12:
'private' modifier out of order with the JLS suggestions)=E2=80=A6 = I=E2=80=99ve fixed that
in my local clone and now it=E2=80=99s building fine. I hope to be able = to run your
contrib unchanged on top of the incubator-beam codebase, which will be = what
I attempt to do now.

Thanks again to all, for your swift help.

Bill

On Mar 18, 2016, at 12:55 PM, Raghu Angadi <rangadi@google.com>
wrote:

Hi Bill,

We have fairly well tested patch for KafkaIO (pr #121). It will be
merged soon. The example there keeps track of top hashtags in 10 = minute
sliding window and writes the results to another Kafka topic. Please try = it
if you can. It is well tested on Google Cloud Dataflow. I have not run = it
using Flink runner.

Raghu.

On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
<k.kloudas@data-artisans.com> wrote:
Hello Bill,

This is a known limitation of the Flink Runner.
There is a JIRA issue for that
https://issues.apache.org/jira/browse/BEAM-127

A wrapper for Flink sinks will come soon and as Beam evolves,
a more Beam-y solution will come as well.

Kostas
On Mar 18, 2016, at 5:23 PM, William McCarthy
<williammccarthy@gmail.com> wrote:

Hi,

I=E2=80=99m trying to write a proof-of-concept which takes messages = from
Kafka, transforms them using Beam on Flink, then pushes the results onto = a
different Kafka topic.

I=E2=80=99ve used the KafkaWindowedWordCountExample as a starting = point,
and that=E2=80=99s doing the first part of what I want to do, but it = outputs to text
files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I = can=E2=80=99t
figure out how to plug it into the pipeline. I was thinking that it = would be
wrapped with an UnboundedFlinkSink, or some such, but that doesn=E2=80=99t= seem to
exist.

Any advice or thoughts on what I=E2=80=99m trying to do?

I=E2=80=99m running the latest incubator-beam (as of last night from
Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
Compute Engine (Debian Jessie).

Thanks,

Bill McCarthy















= --Apple-Mail=_9EE23E3E-A351-4863-B357-D107066A7E44--