apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <thomas.we...@gmail.com>
Subject Re: Kafka input operator
Date Sun, 19 Jun 2016 22:29:38 GMT
kafka.message.Message is the problem, MutablePair has a no-arg constructor
and should be serializable for Kryo,


On Sun, Jun 19, 2016 at 3:10 PM, <hsy541@gmail.com> wrote:

> The Pairs in Apache common are not Kryo serializable. You can use other
> pair data structure. For example KeyValuePair in Malhar library
>
> Siyuan
>
> Sent from my iPhone
>
> On Jun 19, 2016, at 14:58, Raja.Aravapalli <Raja.Aravapalli@target.com>
> wrote:
>
>
> Hi Priyanka,
>
> I am writing to read the messages in the next operator with input port
> defined like the below,
>
> public transient DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>
input = new DefaultInputPort<MutablePair<Message, MutablePair<Long, Integer>>>()
>
>
> Application is failing with below exception:
>
> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec (DefaultStatefulStreamCodec.java:fromDataStatePair(98))
- Catastrophic Error: Execution halted due to Kryo exception!
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor):
kafka.message.Message
> Serialization trace:
> left (org.apache.commons.lang3.tuple.MutablePair)
> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>
>
>
> Any help please.
>
> Regards,
> Raja.
>
> From: "Raja.Aravapalli" <Raja.Aravapalli@target.com>
> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
> Date: Sunday, June 19, 2016 at 12:22 AM
> To: "users@apex.apache.org" <users@apex.apache.org>
> Subject: Re: Kafka input operator
>
>
> Thanks for the response Priyanka…
>
> But, when I try to put in my own package, some of the protected variables
> are not accessible!!!!
>
>
> Regards,
> Raja.
>
> From: Priyanka Gugale <priyanka@datatorrent.com>
> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
> Date: Saturday, June 18, 2016 at 10:29 AM
> To: "users@apex.apache.org" <users@apex.apache.org>
> Subject: Re: Kafka input operator
>
> Hi,
>
> Yes sure, you can use any package name you want. In fact better you put
> this class outside Malhar jar. Just keep the Malhar jar in your class path.
>
> -Priyanka
> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <Raja.Aravapalli@target.com>
> wrote:
>
>>
>> Hi Priyanka,
>>
>> Can this be done from a class outside the package “
>> com.datatorrent.contrib.kafka;” ?
>>
>> I don’t want to disturb the source :(
>>
>>
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <Raja.Aravapalli@target.com>
>> Date: Friday, June 17, 2016 at 5:38 AM
>> To: "users@apex.apache.org" <users@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>>
>> Hi Priyanka,
>>
>> I am using kafka version 0.8.x.
>>
>> Awesome. Yes. This is what is want. I shall test this and share my
>> updates. Having one kafka operator like this in Malhar, will be a very good
>> one. I don’t see such availability in Storm as well!!
>>
>>
>>
>> Regards,
>> Raja.
>>
>> From: Priyanka Gugale <priyanka@datatorrent.com>
>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>> Date: Friday, June 17, 2016 at 2:05 AM
>> To: "users@apex.apache.org" <users@apex.apache.org>
>> Subject: Re: Kafka input operator
>>
>> Hi Raja,
>>
>> I have quickly wrote an operator to fulfill your requirement. The code is
>> available here
>> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
>> Let me know if this addresses your usecase.
>>
>> -Priyanka
>>
>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <
>> priyanka@datatorrent.com> wrote:
>>
>>> Hi Raja,
>>>
>>> You will need to update other places as well (I guess it's replay other
>>> than emitTuples) . But I think it is not feasible to replicate emitTuples
>>> code in subclass as many of the parent class variables are private. I would
>>> try to figure out if there is any other way.
>>>
>>> Can you please confirm which Kafka version you are using?
>>>
>>> -Priyanka
>>>
>>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
>>> Raja.Aravapalli@target.com> wrote:
>>>
>>>>
>>>> Hi Chaitanya,
>>>>
>>>> Would the below changes you proposed enough to retrieve partition &
>>>> offset ?
>>>>
>>>> I see *emitTuple(Message msg) i*s being called at various places in
>>>> the code… please advise. Thank you.
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: "Raja.Aravapalli" <Raja.Aravapalli@target.com>
>>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>>> Subject: Re: Kafka input operator
>>>>
>>>>
>>>> Thanks for the response Chaitanya. I will follow the suggestions to
>>>> retrieve Kafka partitionId & offset!!
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: Chaitanya Chebolu <chaitanya@datatorrent.com>
>>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>>>> Date: Monday, June 13, 2016 at 3:06 AM
>>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>>> Subject: Re: Kafka input operator
>>>>
>>>> Hi Raja,
>>>>
>>>>    I think you are using 0.8 version of kafka operator. There is no
>>>> such operator in Malhar.  To meet your requirement, please do as below:
>>>>
>>>>   Create a new class which extend from AbstractKafkaInputOperator.
>>>> Override the API "void emitTuples()" and create the output port of type
>>>> MutablePair<Message,MutablePair<long,int>>
>>>>
>>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>>>> below line:
>>>> emitTuple(message.msg) to
>>>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>>
>>>> Regards,
>>>> Chaitanya
>>>>
>>>>
>>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>>>> Raja.Aravapalli@target.com> wrote:
>>>>
>>>>>
>>>>> Hi
>>>>>
>>>>> Does anyone have an idea, if any of the existing kafka input operators
>>>>> give the ability to retrieve  kafka Partition ID & Offset a particular
>>>>> message came from, along with the messages ?
>>>>>
>>>>>
>>>>> Thanks a lot in advance.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message