storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Idan Fridman <idan.f...@gmail.com>
Subject Re: NullPointerException from deep inside Storm
Date Tue, 17 Feb 2015 17:18:56 GMT
Hi All,
I wonder if anyone could take a look into it? this exception is keep
occurring to me. any leads please?

2015-02-11 10:18 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:

> Hi,
> Here is the almost full bolt I am using which causing that exception:
>
>
> public class AsyncBolt extends BaseRichBolt {
>
>
>     private AsyncHttpClient asyncHttpClient;
>     private OutputCollector outputCollector;
>
>
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("pushMessageResponse"));
>     }
>
>
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         asyncHttpClient = new AsyncHttpClient();
>         outputCollector = collector;
>
>     }
>
>
>     @Override
>     public void execute(final Tuple tuple) {
>         final PushMessageRequestDTO pushMessageRequestDTO =
> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>         String url = "some.url";
>         asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
>             @Override
>             public Response onCompleted(Response response) throws
> Exception {
>                 PushMessageResponseDTO pushMessageResponseDTO = new
> PushMessageResponseDTO(pushMessageRequestDTO);
>                 emitTuple(pushMessageResponseDTO, tuple);
>                 return response;
>             }
>         });
>     }
>     //we are synchronizing basicOutputCollector
>     // because we have callbacks and we need to make sure all acks are
> called from the same thread
>     private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO,
> Tuple tuple) {
>         synchronized (outputCollector) {
>             outputCollector.emit(tuple, new
> Values(pushMessageResponseDTO));
>             outputCollector.ack(tuple);
>         }
>     }
> }
>
>
>
> 2015-02-10 20:27 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:
>
>> Yes I cutted those from this post for the sake simplicity aswell. I can
>> past the whole bolt it just has some intern logic
>> On Feb 10, 2015 8:14 PM, "Michael Rose" <michael@fullcontact.com> wrote:
>>
>>> Your output fields declarer isn't invoked. Is it empty in your actual
>>> implementation as well?
>>>
>>> Perhaps you can post a gist with your full file?
>>>
>>> *Michael Rose*
>>> Senior Platform Engineer
>>> *Full*Contact | fullcontact.com
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>> m: +1.720.837.1357 | t: @xorlev
>>>
>>>
>>> All Your Contacts, Updated and In One Place.
>>> Try FullContact for Free
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>
>>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <idan.frid@gmail.com>
>>> wrote:
>>>
>>>> I actually created pojo of mine and emitted it. I edited here the code
>>>> for the sake of simplicity. So you think I should try convert the Object
>>>> into byte/string and then emmit? What about the outputcollector
>>>> synchronizing ? Does it make sense?
>>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <michael@fullcontact.com>
>>>> wrote:
>>>>
>>>>> Out of curiosity, have you tried just emitting the response as a
>>>>> string/byte array vs. the whole response object?
>>>>>
>>>>> *Michael Rose*
>>>>> Senior Platform Engineer
>>>>> *Full*Contact | fullcontact.com
>>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>> m: +1.720.837.1357 | t: @xorlev
>>>>>
>>>>>
>>>>> All Your Contacts, Updated and In One Place.
>>>>> Try FullContact for Free
>>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>>
>>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <idan.frid@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>>>
>>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:
>>>>>>
>>>>>>> I am opening async call to a webservice from a bolt.
>>>>>>>
>>>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>>>> external AsycHttpClient library) and after that I am emitting
to the next
>>>>>>> bolt
>>>>>>>
>>>>>>> I asked and read that if I synchronized the outputCollector it
will
>>>>>>> make sure all that all acks and callbacks will be called from
the same
>>>>>>> Thread.
>>>>>>>
>>>>>>> However after load-test I started to get this:
>>>>>>>
>>>>>>>
>>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>> at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
at
>>>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.lang.NullPointerException at
>>>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>>>> at
>>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>
>>>>>>> Thats my bolt:
>>>>>>>
>>>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer)
{
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>>>> OutputCollector collector) {
>>>>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>>>>         outputCollector = collector;
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void execute(final Tuple tuple) {
>>>>>>>
>>>>>>>         asyncHttpClient.preparePost(url).execute(new
>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>             @Override
>>>>>>>             public Response onCompleted(Response response) throws
>>>>>>> Exception {
>>>>>>>                ...
>>>>>>>                 emitTuple(response, tuple);
>>>>>>>                 return response;
>>>>>>>             }
>>>>>>>         });
>>>>>>>     }
>>>>>>>     //we are synchronizing basicOutputCollector
>>>>>>>     // because we have callbacks and we need to make sure all
acks
>>>>>>> are called from the same thread
>>>>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>>>>         synchronized (outputCollector) {
>>>>>>>             outputCollector.emit(tuple, new Values(response));
>>>>>>>             outputCollector.ack(tuple);
>>>>>>>         }
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> Please any leads about that?
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>
>>>>>
>>>
>

Mime
View raw message