apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chaitanya Chebolu <chaita...@datatorrent.com>
Subject Re: Problem reading from KafkaConsoleProducer topic using Apex..
Date Fri, 05 May 2017 11:10:43 GMT
@Sushil Could you please share the container logs ?

On Fri, May 5, 2017 at 4:33 PM, Sushil Apex <sushil.aapex@gmail.com> wrote:

> To add here
> I am using Cloudera VM for hdfs and running Kafka from apache with single
> machine zookeeper
>
> On Fri, May 5, 2017 at 4:32 PM, Sushil Apex <sushil.aapex@gmail.com>
> wrote:
>
>> @vikram, I am using apex cli
>> not sure where ts logs will be generated, checked /var/log/hadoop-yarn
>> but did found any updated logs for apex
>>
>> I have changed the application name but that didn't helped
>>
>> On Fri, May 5, 2017 at 4:03 PM, vikram patil <patilvikram@gmail.com>
>> wrote:
>>
>>> Can you share your Apex logs?  Have you launched an application using
>>> apex cli?
>>>
>>> Also, can you try once by changing name of your application and
>>> relaunch again. Please kill or shut down your existing running
>>> application before this.
>>>
>>> -Vikram
>>>
>>>
>>>
>>>
>>> On Fri, May 5, 2017 at 3:56 PM, Sushil Apex <sushil.aapex@gmail.com>
>>> wrote:
>>> > Tried with EARLIEST option also, no luck :(
>>> >
>>> > On Fri, May 5, 2017 at 3:49 PM, Sushil Apex <sushil.aapex@gmail.com>
>>> wrote:
>>> >>
>>> >> @Vikram the folder /tmp/FromKafka is already created.
>>> >> @Chaitanya yes I am pusing messages in kafka topic using
>>> consoleProducer
>>> >>  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> >> kafka2hdfs
>>> >>
>>> >> @Ajay
>>> >> will try with the initial offset to EARLIEST.
>>> >>
>>> >> Thank you
>>> >> Sushil
>>> >>
>>> >> On Fri, May 5, 2017 at 2:54 PM, AJAY GUPTA <ajaygit158@gmail.com>
>>> wrote:
>>> >>>
>>> >>> Also, you may be required to specify the initial offset to EARLIEST.
>>> >>>
>>> >>> Ajay
>>> >>>
>>> >>> On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvikram@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Hi Sushil,
>>> >>>>
>>> >>>> Have you provided configuration specifying hdfs directory and
file
>>> for
>>> >>>> an application?
>>> >>>> You may have to create /tmp/fromKafka directory in hdfs.
>>> >>>> Thanks & Regards,
>>> >>>> Vikram
>>> >>>>
>>> >>>> On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aapex@gmail.com
>>> >
>>> >>>> wrote:
>>> >>>> >
>>> >>>> > I am using Apex 3.5.0 and kafka 0.9
>>> >>>> > malhar library used is 3.6.0
>>> >>>> >
>>> >>>> > I am following the example from
>>> >>>> > https://github.com/DataTorrent/examples/blob/master/tutorial
>>> s/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
>>> >>>> >
>>> >>>> > I am putting messages in Kafka topic using consolekafkaProducer
>>> >>>> > provided by Kafka, but I am not able to read these messages
in
>>> Apex
>>> >>>> > DAG(created based on above link).
>>> >>>> >
>>> >>>> > I am running the apex apa file through apex-cli
>>> >>>> >
>>> >>>> > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch:
>>> >>>> > 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
>>> >>>> > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
>>> >>>> >   1. Kafka2HDFS
>>> >>>> >   2. MyFirstApplication
>>> >>>> > Choose application: 1
>>> >>>> >
>>> >>>> > and nothing happens after this, I can see the messages
put in
>>> >>>> > consoleConsumer in kafka logs
>>> >>>> >
>>> >>>> > Properties used are
>>> >>>> >
>>> >>>> > <property>
>>> >>>> >   <name>dt.operator.kafkaIn.prop.topics</name>
>>> >>>> >   <value>kafka2hdfs</value>
>>> >>>> > </property>
>>> >>>> >
>>> >>>> > <property>
>>> >>>> >   <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
>>> >>>> >   <value>localhost:2181</value>
>>> >>>> > </property>
>>> >>>> > <property>
>>> >>>> >   <name>dt.operator.kafkaIn.prop.clusters</name>
>>> >>>> >   <value>localhost:9092</value>
>>> >>>> > </property>
>>> >>>> > <property>
>>> >>>> >   <name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
>>> >>>> >   <value>1</value>
>>> >>>> > </property>
>>> >>>> >
>>> >>>> >
>>> >>>> > Application Code
>>> >>>> >
>>> >>>> > KafkaSinglePortByteArrayInputOperator in
>>> >>>> >                 = dag.addOperator("kafkaIn", new
>>> >>>> > KafkaSinglePortByteArrayInputOperator());
>>> >>>> >
>>> >>>> >         LineOutputOperator out = dag.addOperator("fileOut",
new
>>> >>>> > LineOutputOperator());
>>> >>>> >
>>> >>>> >         dag.addStream("dataf", in.outputPort, out.input);
>>> >>>> >
>>> >>>> >
>>> >>>> > I am not able to understand what config I am missing here?
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>


-- 

*Chaitanya*

Software Engineer

E: chaitanya@datatorrent.com | Twitter: @chaithu1403

www.datatorrent.com  |  apex.apache.org

Mime
View raw message