storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Idan Fridman <idan.f...@gmail.com>
Subject NullPointerException from deep inside Storm
Date Tue, 10 Feb 2015 16:29:27 GMT
I am opening async call to a webservice from a bolt.

I'am opening socket and retrieving the result asynchronous(using external
AsycHttpClient library) and after that I am emitting to the next bolt

I asked and read that if I synchronized the outputCollector it will make
sure all that all acks and callbacks will be called from the same Thread.

However after load-test I started to get this:


java.lang.RuntimeException: java.lang.NullPointerException at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException at
clojure.lang.RT.intCast(RT.java:1087) at
backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)

Thats my bolt:

public class AsyncBolt extends BaseRichBolt {

...

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }


    @Override
    public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
        asyncHttpClient = new AsyncHttpClient();
        outputCollector = collector;
    }


    @Override
    public void execute(final Tuple tuple) {

        asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
            @Override
            public Response onCompleted(Response response) throws Exception
{
               ...
                emitTuple(response, tuple);
                return response;
            }
        });
    }
    //we are synchronizing basicOutputCollector
    // because we have callbacks and we need to make sure all acks are
called from the same thread
    private void emitTuple(Response response, Tuple tuple) {
        synchronized (outputCollector) {
            outputCollector.emit(tuple, new Values(response));
            outputCollector.ack(tuple);
        }
    }
}

Please any leads about that?

Thank you.

Mime
View raw message