Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 71CB110598 for ; Tue, 10 Feb 2015 16:29:56 +0000 (UTC) Received: (qmail 42100 invoked by uid 500); 10 Feb 2015 16:29:55 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 42051 invoked by uid 500); 10 Feb 2015 16:29:55 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 42041 invoked by uid 99); 10 Feb 2015 16:29:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Feb 2015 16:29:55 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of idan.frid@gmail.com designates 74.125.82.180 as permitted sender) Received: from [74.125.82.180] (HELO mail-we0-f180.google.com) (74.125.82.180) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Feb 2015 16:29:48 +0000 Received: by mail-we0-f180.google.com with SMTP id m14so34751133wev.11 for ; Tue, 10 Feb 2015 08:29:27 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=n199ofye8eizlqNSI2GLNQ8a8+MkqODvzpPOC/ZaRaM=; b=OtC7pf1GzKLKUZfK5vKWtI0r+GLuIwmKcUoJwRWwX+NZ5i0LeboKI/W0pahTe7/J3e aHdfpFofr7ZB69SPvcQyHeYIys8uiMR2ADtAgklW3lpGGOVmZFbcrAYKv4sPvzCGfAnT W3YdNuEcqd3+AMmI6eKzMRtsPWHpNH6ibOPfAIVNyPMt++D+gUoUUxSKpa9o+bVP/YO/ Yao6eow9KRkbcOJSGYQo3wBnlkA4Vwm+qZhsDX3oFSMpvbn/3gFk/yNlb/48iBid3ArW yDcXzYqv2iFKqjlrR5b1HQseL3bQC8dEqgL6sI2HF9F+uuTsTKUUyXfbvynYLRmtb7h+ TMDA== MIME-Version: 1.0 X-Received: by 10.180.89.210 with SMTP id bq18mr47242313wib.45.1423585767336; Tue, 10 Feb 2015 08:29:27 -0800 (PST) Received: by 10.216.227.12 with HTTP; Tue, 10 Feb 2015 08:29:27 -0800 (PST) Date: Tue, 10 Feb 2015 18:29:27 +0200 Message-ID: Subject: NullPointerException from deep inside Storm From: Idan Fridman To: user@storm.apache.org Content-Type: multipart/alternative; boundary=e89a8f3ba5e3f8327a050ebe6377 X-Virus-Checked: Checked by ClamAV on apache.org --e89a8f3ba5e3f8327a050ebe6377 Content-Type: text/plain; charset=UTF-8 I am opening async call to a webservice from a bolt. I'am opening socket and retrieving the result asynchronous(using external AsycHttpClient library) and after that I am emitting to the next bolt I asked and read that if I synchronized the outputCollector it will make sure all that all acks and callbacks will be called from the same Thread. However after load-test I started to get this: java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at clojure.lang.RT.intCast(RT.java:1087) at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) Thats my bolt: public class AsyncBolt extends BaseRichBolt { ... @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { asyncHttpClient = new AsyncHttpClient(); outputCollector = collector; } @Override public void execute(final Tuple tuple) { asyncHttpClient.preparePost(url).execute(new AsyncCompletionHandler() { @Override public Response onCompleted(Response response) throws Exception { ... emitTuple(response, tuple); return response; } }); } //we are synchronizing basicOutputCollector // because we have callbacks and we need to make sure all acks are called from the same thread private void emitTuple(Response response, Tuple tuple) { synchronized (outputCollector) { outputCollector.emit(tuple, new Values(response)); outputCollector.ack(tuple); } } } Please any leads about that? Thank you. --e89a8f3ba5e3f8327a050ebe6377 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I am opening async call to a webservice f= rom a bolt.

I'am openi= ng socket and retrieving the result asynchronous(using external AsycHttpCli= ent library) and after that I am emitting to the next bolt

I asked and read that if I=C2=A0synchroniz= ed the=C2=A0outputCollector it will make sure all that all acks and callbac= ks will be called from the same Thread.

However after load-test I started to get this:


java.lang.RuntimeException: java.lang.NullPointerException at backt= ype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128= ) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupto= rQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.in= voke(disruptor.clj:80) at backtype.storm.disruptor$consume_loop_STAR_$fn__1= 460.invoke(disruptor.clj:94) at backtype.storm.util$async_loop$fn__464.invo= ke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.r= un(Thread.java:745) Caused by: java.lang.NullPointerException at clojure.la= ng.RT.intCast(RT.java:1087) at backtype.storm.daemon.worker$mk_transfer_fn$= fn__3549.invoke(worker.clj:129) at backtype.storm.daemon.executor$start_bat= ch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) at b= acktype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:5= 8)=C2=A0

Thats my bolt:

public class AsyncBolt extends BaseRichBolt {<= /div>
=C2=A0 =C2=A0
...

=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public void declareOutputFields(OutputFieldsDeclar= er declarer) {
=C2=A0 =C2=A0 }
=

=C2=A0 =C2=A0 @Overri= de
=C2=A0 =C2=A0 public void prepare(Map stormConf, T= opologyContext context, OutputCollector collector) {
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 asyncHttpClient =3D new AsyncHttpClient();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 outputCollector =3D collector= ;
=C2=A0 =C2=A0 }


=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public void execute(final Tuple tuple) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 asyncHttpClient.preparePost(url).execute(new AsyncComplet= ionHandler<Response>() {
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 public Response onCompleted(Response response) throws = Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0...
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 emitTuple(response, tuple);
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return response;
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 });
=C2=A0 =C2= =A0 }
=C2=A0 =C2=A0 //we are synchronizing basicOutpu= tCollector
=C2=A0 =C2=A0 // because we have callbacks= and we need to make sure all acks are called from the same thread
=C2=A0 =C2=A0 private void emitTuple(Response response, Tuple= tuple) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 synchronized (o= utputCollector) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 outputCollector.emit(tuple, new Values(response));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 outputCollector.ack(tuple);<= /div>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
= =C2=A0 =C2=A0 }
}

Please any leads about that?

Thank you.
--e89a8f3ba5e3f8327a050ebe6377--