apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raja.Aravapalli <Raja.Aravapa...@target.com>
Subject Re: Kafka input operator
Date Sun, 19 Jun 2016 05:22:15 GMT

Thanks for the response Priyanka…

But, when I try to put in my own package, some of the protected variables are not accessible!!!!


Regards,
Raja.

From: Priyanka Gugale <priyanka@datatorrent.com<mailto:priyanka@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Saturday, June 18, 2016 at 10:29 AM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: Kafka input operator


Hi,

Yes sure, you can use any package name you want. In fact better you put this class outside
Malhar jar. Just keep the Malhar jar in your class path.

-Priyanka

On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Hi Priyanka,

Can this be done from a class outside the package “com.datatorrent.contrib.kafka;” ?

I don’t want to disturb the source :(



Regards,
Raja.

From: "Raja.Aravapalli" <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
Date: Friday, June 17, 2016 at 5:38 AM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: Kafka input operator


Hi Priyanka,

I am using kafka version 0.8.x.

Awesome. Yes. This is what is want. I shall test this and share my updates. Having one kafka
operator like this in Malhar, will be a very good one. I don’t see such availability in
Storm as well!!



Regards,
Raja.

From: Priyanka Gugale <priyanka@datatorrent.com<mailto:priyanka@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Friday, June 17, 2016 at 2:05 AM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is available here<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <priyanka@datatorrent.com<mailto:priyanka@datatorrent.com>>
wrote:
Hi Raja,

You will need to update other places as well (I guess it's replay other than emitTuples) .
But I think it is not feasible to replicate emitTuples code in subclass as many of the parent
class variables are private. I would try to figure out if there is any other way.

Can you please confirm which Kafka version you are using?

-Priyanka

On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Hi Chaitanya,

Would the below changes you proposed enough to retrieve partition & offset ?

I see emitTuple(Message msg) is being called at various places in the code… please advise.
Thank you.


Regards,
Raja.

From: "Raja.Aravapalli" <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
Date: Tuesday, June 14, 2016 at 9:50 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: Kafka input operator


Thanks for the response Chaitanya. I will follow the suggestions to retrieve Kafka partitionId
& offset!!


Regards,
Raja.

From: Chaitanya Chebolu <chaitanya@datatorrent.com<mailto:chaitanya@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Monday, June 13, 2016 at 3:06 AM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: Kafka input operator

Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such operator in Malhar.
 To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override the API "void
emitTuples()" and create the output port of type MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Hi

Does anyone have an idea, if any of the existing kafka input operators give the ability to
retrieve  kafka Partition ID & Offset a particular message came from, along with the messages
?


Thanks a lot in advance.


Regards,
Raja.



Mime
View raw message