apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Priyanka Gugale <priya...@datatorrent.com>
Subject Re: Kafka input operator
Date Mon, 20 Jun 2016 08:28:44 GMT
Hi Raja,

I have opened a pull request
<https://github.com/apache/apex-malhar/pull/323/files> in Malhar to add
emitTuple method to AbstractKafkaInputOperator which has KafkaMessage
parameter. The KafkaMessage has both partitionId and offset along with the
message. Once this gets merged, you can override* only* emitTuple method in
your subclass and convert "Message" into String or bytes array. So now you
won't have variable access problem as well as serialization issues.

-Priyanka

On Mon, Jun 20, 2016 at 3:59 AM, Thomas Weise <thomas.weise@gmail.com>
wrote:

> kafka.message.Message is the problem, MutablePair has a no-arg constructor
> and should be serializable for Kryo,
>
>
> On Sun, Jun 19, 2016 at 3:10 PM, <hsy541@gmail.com> wrote:
>
>> The Pairs in Apache common are not Kryo serializable. You can use other
>> pair data structure. For example KeyValuePair in Malhar library
>>
>> Siyuan
>>
>> Sent from my iPhone
>>
>> On Jun 19, 2016, at 14:58, Raja.Aravapalli <Raja.Aravapalli@target.com>
>> wrote:
>>
>>
>> Hi Priyanka,
>>
>> I am writing to read the messages in the next operator with input port
>> defined like the below,
>>
>> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long,
Integer>>> input = new DefaultInputPort<MutablePair<Message, MutablePair<Long,
Integer>>>()
>>
>>
>> Application is failing with below exception:
>>
>> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98))
- Catastrophic Error: Execution halted due to Kryo exception!
>> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg
constructor): kafka.message.Message
>> Serialization trace:
>> left (org.apache.commons.lang3.tuple.MutablePair)
>> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>
>>
>>
>> Any help please.
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <Raja.Aravapalli@target.com>
>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>> Date: Sunday, June 19, 2016 at 12:22 AM
>> To: "users@apex.apache.org" <users@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>>
>> Thanks for the response Priyanka…
>>
>> But, when I try to put in my own package, some of the protected variables
>> are not accessible!!!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Priyanka Gugale <priyanka@datatorrent.com>
>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>> Date: Saturday, June 18, 2016 at 10:29 AM
>> To: "users@apex.apache.org" <users@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>> Hi,
>>
>> Yes sure, you can use any package name you want. In fact better you put
>> this class outside Malhar jar. Just keep the Malhar jar in your class path.
>>
>> -Priyanka
>> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Raja.Aravapalli@target.com>
>> wrote:
>>
>>>
>>> Hi Priyanka,
>>>
>>> Can this be done from a class outside the package “
>>> com.datatorrent.contrib.kafka;” ?
>>>
>>> I don’t want to disturb the source :(
>>>
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <Raja.Aravapalli@target.com>
>>> Date: Friday, June 17, 2016 at 5:38 AM
>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>> Subject: Re: Kafka input operator
>>>
>>>
>>> Hi Priyanka,
>>>
>>> I am using kafka version 0.8.x.
>>>
>>> Awesome. Yes. This is what is want. I shall test this and share my
>>> updates. Having one kafka operator like this in Malhar, will be a very good
>>> one. I don’t see such availability in Storm as well!!
>>>
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Priyanka Gugale <priyanka@datatorrent.com>
>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>>> Date: Friday, June 17, 2016 at 2:05 AM
>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>> Subject: Re: Kafka input operator
>>>
>>> Hi Raja,
>>>
>>> I have quickly wrote an operator to fulfill your requirement. The code
>>> is available here
>>> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
>>> Let me know if this addresses your usecase.
>>>
>>> -Priyanka
>>>
>>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <
>>> priyanka@datatorrent.com> wrote:
>>>
>>>> Hi Raja,
>>>>
>>>> You will need to update other places as well (I guess it's replay other
>>>> than emitTuples) . But I think it is not feasible to replicate emitTuples
>>>> code in subclass as many of the parent class variables are private. I would
>>>> try to figure out if there is any other way.
>>>>
>>>> Can you please confirm which Kafka version you are using?
>>>>
>>>> -Priyanka
>>>>
>>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
>>>> Raja.Aravapalli@target.com> wrote:
>>>>
>>>>>
>>>>> Hi Chaitanya,
>>>>>
>>>>> Would the below changes you proposed enough to retrieve partition &
>>>>> offset ?
>>>>>
>>>>> I see *emitTuple(Message msg) i*s being called at various places in
>>>>> the code… please advise. Thank you.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>> From: "Raja.Aravapalli" <Raja.Aravapalli@target.com>
>>>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>>>> Subject: Re: Kafka input operator
>>>>>
>>>>>
>>>>> Thanks for the response Chaitanya. I will follow the suggestions to
>>>>> retrieve Kafka partitionId & offset!!
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>> From: Chaitanya Chebolu <chaitanya@datatorrent.com>
>>>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>>>>> Date: Monday, June 13, 2016 at 3:06 AM
>>>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>>>> Subject: Re: Kafka input operator
>>>>>
>>>>> Hi Raja,
>>>>>
>>>>>    I think you are using 0.8 version of kafka operator. There is no
>>>>> such operator in Malhar.  To meet your requirement, please do as below:
>>>>>
>>>>>   Create a new class which extend from AbstractKafkaInputOperator.
>>>>> Override the API "void emitTuples()" and create the output port of type
>>>>> MutablePair<Message,MutablePair<long,int>>
>>>>>
>>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>>>>> below line:
>>>>> emitTuple(message.msg) to
>>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>>>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>>>
>>>>> Regards,
>>>>> Chaitanya
>>>>>
>>>>>
>>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>>>>> Raja.Aravapalli@target.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Does anyone have an idea, if any of the existing kafka input
>>>>>> operators give the ability to retrieve  kafka Partition ID &
Offset a
>>>>>> particular message came from, along with the messages ?
>>>>>>
>>>>>>
>>>>>> Thanks a lot in advance.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Raja.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Mime
View raw message