storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Idan Fridman <idan.f...@gmail.com>
Subject Re: NullPointerException from deep inside Storm
Date Wed, 11 Feb 2015 08:18:19 GMT
Hi,
Here is the almost full bolt I am using which causing that exception:


public class AsyncBolt extends BaseRichBolt {


    private AsyncHttpClient asyncHttpClient;
    private OutputCollector outputCollector;



    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("pushMessageResponse"));
    }



    @Override
    public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
        asyncHttpClient = new AsyncHttpClient();
        outputCollector = collector;

    }


    @Override
    public void execute(final Tuple tuple) {
        final PushMessageRequestDTO pushMessageRequestDTO =
(PushMessageRequestDTO) tuple.getValueByField("pushMessage");
        String url = "some.url";
        asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
            @Override
            public Response onCompleted(Response response) throws Exception
{
                PushMessageResponseDTO pushMessageResponseDTO = new
PushMessageResponseDTO(pushMessageRequestDTO);
                emitTuple(pushMessageResponseDTO, tuple);
                return response;
            }
        });
    }
    //we are synchronizing basicOutputCollector
    // because we have callbacks and we need to make sure all acks are
called from the same thread
    private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO,
Tuple tuple) {
        synchronized (outputCollector) {
            outputCollector.emit(tuple, new Values(pushMessageResponseDTO));
            outputCollector.ack(tuple);
        }
    }
}



2015-02-10 20:27 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:

> Yes I cutted those from this post for the sake simplicity aswell. I can
> past the whole bolt it just has some intern logic
> On Feb 10, 2015 8:14 PM, "Michael Rose" <michael@fullcontact.com> wrote:
>
>> Your output fields declarer isn't invoked. Is it empty in your actual
>> implementation as well?
>>
>> Perhaps you can post a gist with your full file?
>>
>> *Michael Rose*
>> Senior Platform Engineer
>> *Full*Contact | fullcontact.com
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>> m: +1.720.837.1357 | t: @xorlev
>>
>>
>> All Your Contacts, Updated and In One Place.
>> Try FullContact for Free
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>
>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <idan.frid@gmail.com>
>> wrote:
>>
>>> I actually created pojo of mine and emitted it. I edited here the code
>>> for the sake of simplicity. So you think I should try convert the Object
>>> into byte/string and then emmit? What about the outputcollector
>>> synchronizing ? Does it make sense?
>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <michael@fullcontact.com> wrote:
>>>
>>>> Out of curiosity, have you tried just emitting the response as a
>>>> string/byte array vs. the whole response object?
>>>>
>>>> *Michael Rose*
>>>> Senior Platform Engineer
>>>> *Full*Contact | fullcontact.com
>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>> m: +1.720.837.1357 | t: @xorlev
>>>>
>>>>
>>>> All Your Contacts, Updated and In One Place.
>>>> Try FullContact for Free
>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>
>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <idan.frid@gmail.com>
>>>> wrote:
>>>>
>>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>>
>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:
>>>>>
>>>>>> I am opening async call to a webservice from a bolt.
>>>>>>
>>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>>> external AsycHttpClient library) and after that I am emitting to
the next
>>>>>> bolt
>>>>>>
>>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>>> make sure all that all acks and callbacks will be called from the
same
>>>>>> Thread.
>>>>>>
>>>>>> However after load-test I started to get this:
>>>>>>
>>>>>>
>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>> at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>> at
>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>> at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.NullPointerException at
>>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>>> at
>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>>> at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>
>>>>>> Thats my bolt:
>>>>>>
>>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>>
>>>>>> ...
>>>>>>
>>>>>>     @Override
>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer)
{
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>>> OutputCollector collector) {
>>>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>>>         outputCollector = collector;
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void execute(final Tuple tuple) {
>>>>>>
>>>>>>         asyncHttpClient.preparePost(url).execute(new
>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>             @Override
>>>>>>             public Response onCompleted(Response response) throws
>>>>>> Exception {
>>>>>>                ...
>>>>>>                 emitTuple(response, tuple);
>>>>>>                 return response;
>>>>>>             }
>>>>>>         });
>>>>>>     }
>>>>>>     //we are synchronizing basicOutputCollector
>>>>>>     // because we have callbacks and we need to make sure all acks
>>>>>> are called from the same thread
>>>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>>>         synchronized (outputCollector) {
>>>>>>             outputCollector.emit(tuple, new Values(response));
>>>>>>             outputCollector.ack(tuple);
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Please any leads about that?
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>
>>>>
>>

Mime
View raw message