beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kenneth Knowles <...@google.com>
Subject Re: KafkaIO Example
Date Thu, 12 Jan 2017 22:14:30 GMT
Hi Naveen,

I have just successfully compiled the code from your most recent email so I
suspect the error lies elsewhere.

Taking a wild guess, the error you are getting would be expected if a
transform were upcast to the rawtype PTransform, as its raw output type is
POutput.

Kenn

On Thu, Jan 12, 2017 at 12:55 PM, Madhire, Naveen <
Naveen.Madhire@capitalone.com> wrote:

> I changed the code to see where the problem is with incompatible
> parameters.
>
>
>
> I get the below error, is there an issue with *withoutMetadata*() method
> since KV<K, V> doesn’t implements POutput ?
>
>
>
> *java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
> converted to
> org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.String>>*
>
>
>
> PCollection <KV<String, String>> kafkaRead = p.apply(KafkaIO.*read*()
>         .withTopics(ImmutableList.*of*(*"Topic"*))
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .updateConsumerProperties((Map) Maps.*fromProperties*(properties))
>         .withoutMetadata());
>
>
>         kafkaRead
>         .apply(*"Transform "*, MapElements.*via*(*new *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *return *input.getKey() + *"  " *+ input.getValue();
>             }
>         }))
>         .apply(TextIO.Write.*to*(*"Location"*));
>
>
>
>
>
>
>
> Thanks,
>
> Naveen
>
>
>
> *From: *"Madhire, Naveen" <Naveen.Madhire@capitalone.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Thursday, January 12, 2017 at 11:55 AM
>
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO Example
>
>
>
> Yes, I’ve verified the kafka connectivity outside of Beam. I wanted to see
> how it works if I read kafka data using Beam.
>
>
>
> I am changed the code to use the default direct runner and getting a
> compilation error now,
>
>
>
>
>
> java: cannot find symbol
>
>   symbol:   method apply(java.lang.String,org.apache.beam.sdk.transforms.
> MapElements<org.apache.beam.sdk.values.KV<java.lang.
> String,java.lang.String>,java.lang.String>)
>
>   location: interface org.apache.beam.sdk.values.POutput
>
>
>
> Am I missing any other dependency in Pom? Please let me know.
>
>
>
> Code:
>
>
>
> PipelineOptions options = PipelineOptionsFactory.*create*();
>
> Pipeline p = Pipeline.*create*(options);
>
>
> *//Read from Kafka Topic **try *{
>     InputStream props = Resources.*getResource*(*"client.properties"*
> ).openStream();
>     Properties properties = *new *Properties();
>     properties.load(props);
>
>
>
>
>
>     p.apply(KafkaIO.*read*()
>         .withTopics(ImmutableList.*of*(*"Topic"*))
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .updateConsumerProperties((Map) Maps.*fromProperties*(properties))
>         .withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *return *input.getKey() + *"  " *+ input.getValue();
>             }
>         }))
>         .apply(TextIO.Write.*to*(*"Location"*));
>
> }
>
>
>
> I’ve the below dependencies included in my pom,
>
>
>
>
>
> *<!-- Apache Beam Dependencies -->*
>
>
>
>
>
>
> *<dependency>     <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-kafka</artifactId>
> <version>0.4.0</version> </dependency> *<*dependency*>
>     <*groupId*>org.apache.beam</*groupId*>
>     <*artifactId*>beam-sdks-java-core</*artifactId*>
>     <*version*>0.4.0</*version*>
> </*dependency*>
> <*dependency*>
>     <*groupId*>org.apache.beam</*groupId*>
>     <*artifactId*>beam-runners-direct-java</*artifactId*>
>     <*version*>0.4.0</*version*>
> </*dependency*>
> <*dependency*>
>     <*groupId*>org.apache.beam</*groupId*>
>     <*artifactId*>beam-runners-core-java</*artifactId*>
>     <*version*>0.4.0</*version*>
> </*dependency*>
>
> *<!—Business logic -->*
>
> <*dependency*>
>     <*groupId*>org.drools</*groupId*>
>     <*artifactId*>drools-core</*artifactId*>
>     <*version*>6.2.0.Final</*version*>
> </*dependency*>
>
>
>
> Thanks,
>
> Naveen
>
>
>
> *From: *Lukasz Cwik <lcwik@google.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Thursday, January 12, 2017 at 10:58 AM
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO Example
>
>
>
> I'm assuming you mean DirectRunner and not DataflowDirectRunner.
>
>
>
> On Wed, Jan 11, 2017 at 4:23 PM, Raghu Angadi <rangadi@google.com> wrote:
>
>
>
> On Wed, Jan 11, 2017 at 1:56 PM, Madhire, Naveen <
> Naveen.Madhire@capitalone.com> wrote:
>
> I can confirm the authorization and authentication to Kafka is behaving
> correctly,
>
>
>
> Did you confirm this within Beam/KafkIO context or outside? When I last
> checked Kafka requires the the credentials file to be available on local
> filesystem where the KafkaConsumer runs. All the workers need to have this
> file. Shipping these files to workers depends on the runner/execution
> environment.
>
>
>
> Can you try with DataflowDirectRunner? That would be simpler to debug.
>
>
>
> Raghu.
>
>
>
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Mime
View raw message