storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Idan Fridman <idan.f...@gmail.com>
Subject Re: NullPointerException from deep inside Storm
Date Tue, 10 Feb 2015 16:30:38 GMT
One note: it's not BasicOutputCollector. but OutputCollector

2015-02-10 18:29 GMT+02:00 Idan Fridman <idan.frid@gmail.com>:

> I am opening async call to a webservice from a bolt.
>
> I'am opening socket and retrieving the result asynchronous(using external
> AsycHttpClient library) and after that I am emitting to the next bolt
>
> I asked and read that if I synchronized the outputCollector it will make
> sure all that all acks and callbacks will be called from the same Thread.
>
> However after load-test I started to get this:
>
>
> java.lang.RuntimeException: java.lang.NullPointerException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException at
> clojure.lang.RT.intCast(RT.java:1087) at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
> at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
> at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>
> Thats my bolt:
>
> public class AsyncBolt extends BaseRichBolt {
>
> ...
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     }
>
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         asyncHttpClient = new AsyncHttpClient();
>         outputCollector = collector;
>     }
>
>
>     @Override
>     public void execute(final Tuple tuple) {
>
>         asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
>             @Override
>             public Response onCompleted(Response response) throws
> Exception {
>                ...
>                 emitTuple(response, tuple);
>                 return response;
>             }
>         });
>     }
>     //we are synchronizing basicOutputCollector
>     // because we have callbacks and we need to make sure all acks are
> called from the same thread
>     private void emitTuple(Response response, Tuple tuple) {
>         synchronized (outputCollector) {
>             outputCollector.emit(tuple, new Values(response));
>             outputCollector.ack(tuple);
>         }
>     }
> }
>
> Please any leads about that?
>
> Thank you.
>

Mime
View raw message