beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From William McCarthy <williammccar...@gmail.com>
Subject Re: Output from Beam (on Flink) to Kafka
Date Mon, 21 Mar 2016 19:31:25 GMT
Hi Raghu,

Sure, I’ll run this this evening and get back to you.

Bill

> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <rangadi@google.com> wrote:
> 
> Thanks Max.
> 
> Bill McCarthy, 
> I know you are unblocked and KafkaWriter is good enough. Please try KafkaIO source from
my branch with Flink runner if you get a chance.
> 
> thanks,
> Raghu.
> 
> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <jb@nanthrax.net <mailto:jb@nanthrax.net>>
wrote:
> Thanks for the update Max !
> 
> Regards
> JB
> 
> 
> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
> FYI: The Runner registration has been fixed. The Flink runner
> explicitly registers as of [1]. Also, the SDK tries to look up the
> PipelineRunner class in case it has not been registered [2].
> 
> [1] https://github.com/apache/incubator-beam/pull/40 <https://github.com/apache/incubator-beam/pull/40>
> [2] https://github.com/apache/incubator-beam/pull/61 <https://github.com/apache/incubator-beam/pull/61>
> 
> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <mxm@apache.org <mailto:mxm@apache.org>>
wrote:
> Great to see such a lively discussion here.
> 
> I think we'll support sinks through the Write interface (like in
> batched execution) and also have a dedicated wrapper for the Flink
> sinks. This is a very pressing but easy to solve issue of the Flink
> runner. Expect it to be in next week.
> 
> Also, the proper registration of the runner is about to to be merged.
> We just need an ok from the contributor to merge the changes.
> 
> Best,
> Max
> 
> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <dhalperi@google.com <mailto:dhalperi@google.com>>
wrote:
> Thanks Bill!
> 
> Filed https://issues.apache.org/jira/browse/BEAM-136 <https://issues.apache.org/jira/browse/BEAM-136>,
but I'm glad it's not
> blocking you!
> 
> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
> <williammccarthy@gmail.com <mailto:williammccarthy@gmail.com>> wrote:
> 
> I tried that, but still no dice: Just to be clear, it’s not a blocker for
> me, given that I have my example running, but for your information the
> exception is below.
> 
> I’ll watch the commit log on the beam incubator and look forward to
> deleting my copy of Raghu’s contributions when they’re merger to master.
> 
> Thanks again for everyone’s help,
> 
> Bill
> 
> 
> Command followed by exception:
> 
> $ flink run -c
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> target/beam-1.0-SNAPSHOT.jar
> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
> 
> ------------------------------------------------------------
>   The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
> DirectPipelineRunner]
> at
> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
> at
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> ... 6 more
> 
> On Mar 18, 2016, at 5:35 PM, Thomas Groh <tgroh@google.com <mailto:tgroh@google.com>>
wrote:
> 
> I don't believe the FlinkPipelineRunner is registered the same way the
> Dataflow & Direct Pipeline runners are registered; using
> org.apache.beam.runners.flink.FlinkPipelineRunner should work
> 
> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
> <williammccarthy@gmail.com <mailto:williammccarthy@gmail.com>> wrote:
> 
> Thanks Dan,
> 
> I tried that, but getting the below. Note that the jar contains the
> FlinkPipelineRunner.
> 
> 
> 
> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
> org/apache/beam/runners/flink/FlinkPipelineRunner.class
> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
> org/apache/beam/runners/flink/FlinkPipelineOptions.class
> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
> 
> % flink run -c
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
> 
> ------------------------------------------------------------
>   The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
> 'FlinkPipelineRunner', supported pipeline runners
> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
> DirectPipelineRunner]
> at
> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
> at
> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
> at
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> ... 6 more
> 
> 
> 
> On Mar 18, 2016, at 5:00 PM, Dan Halperin <dhalperi@google.com <mailto:dhalperi@google.com>>
wrote:
> 
> Thanks for catching that, Aljoscha!
> 
> Note that the Flink runner should be available via a command-line option
> as well: --runner=FlinkPipelineRunner.
> 
> The list of valid values for that flag is computed by walking the
> classpath at runtime, so as long as the Flink jar is present it'll work.
> 
> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
> wrote:
> 
> Hi,
> looks like the example is being executed with the DirectPipelineRunner
> which does not seem to be able to cope with UnboundedSource. You need to set
> the runner to the FlinkRunner in the example code as described here:
> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
<https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example>
> 
> The Flink runner should be able to deal with UnboundedSource but has the
> limitation that sources are always parallelism=1 (this is being worked on,
> however).
> 
> Cheers,
> Aljoscha
> On 18 Mar 2016, at 20:56, Dan Halperin <dhalperi@google.com <mailto:dhalperi@google.com>>
wrote:
> 
> Looks like the Flink runner may not yet support arbitrary code written
> with the UnboundedSource API. That is, it looks like the Flink runner
> expects the sources to get translated away.
> 
> Max?
> 
> Dan
> 
> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
> <williammccarthy@gmail.com <mailto:williammccarthy@gmail.com>> wrote:
> Thanks Raghu,
> 
> When I try to run it on flink using the incubator-beam code, i.e.
> 
> flink run -c
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
> --topics=test_in --outputTopic=test_out
> 
> I get this:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>        at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>        at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>        at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>        at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>        at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>        at
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.IllegalStateException: no evaluator registered
> for Read(UnboundedKafkaSource)
>        at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>        at
> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>        at
> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>        at
> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>        at
> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>        at
> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>        at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>        at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>        at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>        at
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:498)
>        at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>        ... 6 more
> 
> Any ideas?
> 
> Bill
> 
> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <rangadi@google.com <mailto:rangadi@google.com>>
wrote:
> 
> Thanks for trying it.
> 
> I fixed the CheckStyle error  (not sure why my build is not failing).
> Let me know if you see any issues running with Beam. I haven't tried it. I
> should. In fact Daniel Halperin says my patch should be against Beam..
> 
> Raghu.
> 
> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
> <williammccarthy@gmail.com <mailto:williammccarthy@gmail.com>> wrote:
> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
> pointing me to working code.
> 
> I’m in the middle of a hack day at the moment, so the speed of your
> responses has been very welcome.
> 
> In the first instance, I’ll try using your changes, Raghu. I’ve
> cloned your repo, switched to the kafka branch and built both contrib/kafka
> and contrib/examples/kafka. The contrib/kafka initially failed with a
> CheckStyle error
> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
> in my local clone and now it’s building fine. I hope to be able to run your
> contrib unchanged on top of the incubator-beam codebase, which will be what
> I attempt to do now.
> 
> Thanks again to all, for your swift help.
> 
> Bill
> 
> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <rangadi@google.com <mailto:rangadi@google.com>>
> wrote:
> 
> Hi Bill,
> 
> We have fairly well tested patch for KafkaIO (pr #121). It will be
> merged soon. The example there keeps track of top hashtags in 10 minute
> sliding window and writes the results to another Kafka topic. Please try it
> if you can. It is well tested on Google Cloud Dataflow. I have not run it
> using Flink runner.
> 
> Raghu.
> 
> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
> <k.kloudas@data-artisans.com <mailto:k.kloudas@data-artisans.com>> wrote:
> Hello Bill,
> 
> This is a known limitation of the Flink Runner.
> There is a JIRA issue for that
> https://issues.apache.org/jira/browse/BEAM-127 <https://issues.apache.org/jira/browse/BEAM-127>
> 
> A wrapper for Flink sinks will come soon and as Beam evolves,
> a more Beam-y solution will come as well.
> 
> Kostas
> On Mar 18, 2016, at 5:23 PM, William McCarthy
> <williammccarthy@gmail.com <mailto:williammccarthy@gmail.com>> wrote:
> 
> Hi,
> 
> I’m trying to write a proof-of-concept which takes messages from
> Kafka, transforms them using Beam on Flink, then pushes the results onto a
> different Kafka topic.
> 
> I’ve used the KafkaWindowedWordCountExample as a starting point,
> and that’s doing the first part of what I want to do, but it outputs to text
> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t
> figure out how to plug it into the pipeline. I was thinking that it would be
> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
> exist.
> 
> Any advice or thoughts on what I’m trying to do?
> 
> I’m running the latest incubator-beam (as of last night from
> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
> Compute Engine (Debian Jessie).
> 
> Thanks,
> 
> Bill McCarthy
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> -- 
> Jean-Baptiste Onofré
> jbonofre@apache.org <mailto:jbonofre@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
> 


Mime
View raw message