beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dan Halperin <dhalp...@google.com>
Subject Re: Output from Beam (on Flink) to Kafka
Date Fri, 18 Mar 2016 19:56:00 GMT
Looks like
<https://github.com/apache/incubator-beam/blob/ef1e32deefb9886584556c7125e87b2873c63ebf/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L34>
the Flink runner may not yet support arbitrary code written with the
UnboundedSource API. That is, it looks like the Flink runner expects the
sources to get translated away.

Max?

Dan

On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy <
williammccarthy@gmail.com> wrote:

> Thanks Raghu,
>
> When I try to run it on flink using the incubator-beam code, i.e.
>
> flink run -c
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
> --topics=test_in --outputTopic=test_out
>
> I get this:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)
> at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
> at
> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
> at
> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> at
> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> at
> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
> at
> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
> at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
> at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
> at
> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
> at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
> at
> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> ... 6 more
>
> Any ideas?
>
> Bill
>
> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <rangadi@google.com> wrote:
>
> Thanks for trying it.
>
> I fixed the CheckStyle error  (not sure why my build is not failing). Let
> me know if you see any issues running with Beam. I haven't tried it. I
> should. In fact Daniel Halperin says my patch should be against Beam..
>
> Raghu.
>
> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy <
> williammccarthy@gmail.com> wrote:
>
>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>> pointing me to working code.
>>
>> I’m in the middle of a hack day at the moment, so the speed of your
>> responses has been very welcome.
>>
>> In the first instance, I’ll try using your changes, Raghu. I’ve cloned
>> your repo, switched to the kafka branch and built both contrib/kafka and
>> contrib/examples/kafka. The contrib/kafka initially failed with a
>> CheckStyle error
>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
>> in my local clone and now it’s building fine. I hope to be able to run your
>> contrib unchanged on top of the incubator-beam codebase, which will be what
>> I attempt to do now.
>>
>> Thanks again to all, for your swift help.
>>
>> Bill
>>
>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <rangadi@google.com> wrote:
>>
>> Hi Bill,
>>
>> We have fairly well tested patch for KafkaIO (pr #121
>> <https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/121>). It
>> will be merged soon. The example there keeps track of top hashtags in 10
>> minute sliding window and writes the results to another Kafka topic. Please
>> try it if you can. It is well tested on Google Cloud Dataflow. I have not
>> run it using Flink runner.
>>
>> Raghu.
>>
>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas <
>> k.kloudas@data-artisans.com> wrote:
>>
>>> Hello Bill,
>>>
>>> This is a known limitation of the Flink Runner.
>>> There is a JIRA issue for that
>>> https://issues.apache.org/jira/browse/BEAM-127
>>>
>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>> a more Beam-y solution will come as well.
>>>
>>> Kostas
>>>
>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <williammccarthy@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I’m trying to write a proof-of-concept which takes messages from Kafka,
>>> transforms them using Beam on Flink, then pushes the results onto a
>>> different Kafka topic.
>>>
>>> I’ve used the KafkaWindowedWordCountExample as a starting point, and
>>> that’s doing the first part of what I want to do, but it outputs to text
>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I
>>> can’t figure out how to plug it into the pipeline. I was thinking that it
>>> would be wrapped with an UnboundedFlinkSink, or some such, but that doesn’t
>>> seem to exist.
>>>
>>> Any advice or thoughts on what I’m trying to do?
>>>
>>> I’m running the latest incubator-beam (as of last night from Github),
>>> Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute Engine
>>> (Debian Jessie).
>>>
>>> Thanks,
>>>
>>> Bill McCarthy
>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message