storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Idan Fridman <idan.f...@gmail.com>
Subject Re: NullPointerException from deep inside Storm
Date Tue, 10 Feb 2015 18:27:41 GMT
Yes I cutted those from this post for the sake simplicity aswell. I can
past the whole bolt it just has some intern logic
On Feb 10, 2015 8:14 PM, "Michael Rose" <michael@fullcontact.com> wrote:

> Your output fields declarer isn't invoked. Is it empty in your actual
> implementation as well?
>
> Perhaps you can post a gist with your full file?
>
> *Michael Rose*
> Senior Platform Engineer
> *Full*Contact | fullcontact.com
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
> m: +1.720.837.1357 | t: @xorlev
>
>
> All Your Contacts, Updated and In One Place.
> Try FullContact for Free
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>
> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <idan.frid@gmail.com>
> wrote:
>
>> I actually created pojo of mine and emitted it. I edited here the code
>> for the sake of simplicity. So you think I should try convert the Object
>> into byte/string and then emmit? What about the outputcollector
>> synchronizing ? Does it make sense?
>> On Feb 10, 2015 8:00 PM, "Michael Rose" <michael@fullcontact.com> wrote:
>>
>>> Out of curiosity, have you tried just emitting the response as a
>>> string/byte array vs. the whole response object?
>>>
>>> *Michael Rose*
>>> Senior Platform Engineer
>>> *Full*Contact | fullcontact.com
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>> m: +1.720.837.1357 | t: @xorlev
>>>
>>>
>>> All Your Contacts, Updated and In One Place.
>>> Try FullContact for Free
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>
>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <idan.frid@gmail.com>
>>> wrote:
>>>
>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>
>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:
>>>>
>>>>> I am opening async call to a webservice from a bolt.
>>>>>
>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>> external AsycHttpClient library) and after that I am emitting to the
next
>>>>> bolt
>>>>>
>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>> make sure all that all acks and callbacks will be called from the same
>>>>> Thread.
>>>>>
>>>>> However after load-test I started to get this:
>>>>>
>>>>>
>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>> at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>> at
>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>> at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.NullPointerException at
>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>> at
>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>> at
>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>
>>>>> Thats my bolt:
>>>>>
>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>
>>>>> ...
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>> OutputCollector collector) {
>>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>>         outputCollector = collector;
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void execute(final Tuple tuple) {
>>>>>
>>>>>         asyncHttpClient.preparePost(url).execute(new
>>>>> AsyncCompletionHandler<Response>() {
>>>>>             @Override
>>>>>             public Response onCompleted(Response response) throws
>>>>> Exception {
>>>>>                ...
>>>>>                 emitTuple(response, tuple);
>>>>>                 return response;
>>>>>             }
>>>>>         });
>>>>>     }
>>>>>     //we are synchronizing basicOutputCollector
>>>>>     // because we have callbacks and we need to make sure all acks are
>>>>> called from the same thread
>>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>>         synchronized (outputCollector) {
>>>>>             outputCollector.emit(tuple, new Values(response));
>>>>>             outputCollector.ack(tuple);
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> Please any leads about that?
>>>>>
>>>>> Thank you.
>>>>>
>>>>
>>>
>

Mime
View raw message