Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E95D7100E1 for ; Fri, 7 Feb 2014 18:03:58 +0000 (UTC) Received: (qmail 66113 invoked by uid 500); 7 Feb 2014 18:03:57 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 66050 invoked by uid 500); 7 Feb 2014 18:03:56 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 66042 invoked by uid 99); 7 Feb 2014 18:03:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Feb 2014 18:03:56 +0000 X-ASF-Spam-Status: No, hits=2.5 required=5.0 tests=FREEMAIL_REPLY,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of ptgoetz@gmail.com designates 209.85.216.174 as permitted sender) Received: from [209.85.216.174] (HELO mail-qc0-f174.google.com) (209.85.216.174) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Feb 2014 18:03:45 +0000 Received: by mail-qc0-f174.google.com with SMTP id x13so6468803qcv.19 for ; Fri, 07 Feb 2014 10:03:24 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=from:content-type:message-id:mime-version:subject:date:references :to:in-reply-to; bh=qw8mAWpek5V3anT/Tzs+uUbrUYWwVUf7MRFxHGvmUug=; b=mQgZ9jNdxBT0ygJ5WIqNTk58HQPVhdb3IOFlg1xbOVtvi2o+bebEVzJ0YEzqsNSB8U bghPWXuf9pa7Q7tj0ZF5DF5BiEbppIR10HoQKyoDnwNYwEp20sg91niJY+0A/X0wpwuc u5gg3FJjOsJaNeW9O24RO/Pfi7kaZyUwk0E5l79t49aT2ThOu83mwaOFSEdJT0YNNS21 eh38SHNYBdto+HECtio+30745932Nk69oIruc8lR6YdTRe4dYqGh1G214gGMJZy4FgXM WqfB/2tvM80aq+YVvIxMQF7uvB17RUyJyWT6KbGFIbMC9ekxXzBId10C6SnuYHDRaWdr iJfQ== X-Received: by 10.140.20.17 with SMTP id 17mr23293289qgi.28.1391796203806; Fri, 07 Feb 2014 10:03:23 -0800 (PST) Received: from thidwick.local (pool-173-59-54-41.phlapa.fios.verizon.net. [173.59.54.41]) by mx.google.com with ESMTPSA id h12sm966709qar.14.2014.02.07.10.03.20 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Fri, 07 Feb 2014 10:03:21 -0800 (PST) From: "P. Taylor Goetz" Content-Type: multipart/signed; boundary="Apple-Mail=_9EE53F24-B053-4CFA-955A-3AF5EF48FDB4"; protocol="application/pgp-signature"; micalg=pgp-sha512 Message-Id: <9A4E47C8-CBE5-442F-B022-C2D538ECD09C@gmail.com> Mime-Version: 1.0 (Mac OS X Mail 7.1 \(1827\)) Subject: Re: Svend's blog - several questions Date: Fri, 7 Feb 2014 13:03:17 -0500 References: <9dedec1d73274f4b8ea99febad0ceb16@CO2PR07MB522.namprd07.prod.outlook.com> <14c28b9796fc451f81f0d9f59f8e5d6f@CO2PR07MB522.namprd07.prod.outlook.com> <841d0ef2c5a949abb622a972b03d9386@CO2PR07MB522.namprd07.prod.outlook.com> <246DABFA-FE61-4E9F-A055-5F043A4CF8E3@gmail.com> <922FAFD4-FF90-4FAD-B4EF-AA1DA81E6CB0@gmail.com> <0ae06f28b249460dae33e6f4412cab12@CO2PR07MB522.namprd07.prod.outlook.com> To: user@storm.incubator.apache.org In-Reply-To: X-Mailer: Apple Mail (2.1827) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail=_9EE53F24-B053-4CFA-955A-3AF5EF48FDB4 Content-Type: multipart/alternative; boundary="Apple-Mail=_013D8318-E196-4751-AC46-12B746551839" --Apple-Mail=_013D8318-E196-4751-AC46-12B746551839 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=windows-1252 Hey Ardrian, I ran the CassandraMapStateTest against Cassandra 1.2.x and 2.0.x and = was unable to reproduce what you=92re seeing. The difference is that I=92m= not using Scala, which may or may not have something to do with it. Can you try running the CassandraMapStateTest against your cassandra = instance? To prevent the test from starting an in-process cassandra server, = comment out the following line: SingletonEmbeddedCassandra.getInstance(); and change the host:port combos to match your environment if you=92re = not running cassandra on localhost. Sorry I can=92t be of more help. Is there anything in your logs leading = up to the NPE that might indicate some other problem? - Taylor On Feb 7, 2014, at 10:44 AM, Adrian Mocanu = wrote: > It=92s Cassandra 2.0 > =20 > From: P. Taylor Goetz [mailto:ptgoetz@gmail.com]=20 > Sent: February-07-14 10:11 AM > To: user@storm.incubator.apache.org > Subject: Re: Svend's blog - several questions > =20 > Adrian, > =20 > Quick question=85 What version of Cassandra are you running this = against? > =20 > - Taylor > =20 > On Feb 7, 2014, at 9:29 AM, Adrian Mocanu = wrote: >=20 >=20 > Hi Taylor, > Thanks for your reply. > I highlighted the line where the NPE occurs, but I can certainly also = give a full stack trace=85 it=92s a bit lengthy. > I=92m still not sure what schema the example code I posted expects to = see in the database=85 I think that=92s the reason for it but it=92s = only speculation at this point. > =20 > 3686 [Thread-21] INFO = com.netflix.astyanax.connectionpool.impl.CountingConnecti > onPoolMonitor - AddHost: 10.10.6.80 > 4408 [Thread-21] INFO backtype.storm.daemon.executor - Prepared bolt = b-0:(4) > 4933 [Thread-21] ERROR = com.netflix.astyanax.connectionpool.impl.CountingConnecti > onPoolMonitor - = com.netflix.astyanax.connectionpool.exceptions.UnknownException > : UnknownException: [host=3D10.10.6.80(10.10.6.80):9160, = latency=3D284(284), attempt > s=3D1]java.lang.NullPointerException > com.netflix.astyanax.connectionpool.exceptions.UnknownException: = UnknownExceptio > n: [host=3D10.10.6.80(10.10.6.80):9160, latency=3D284(284), = attempts=3D1]java.lang.Nul > lPointerException > at = com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException > (ThriftConverter.java:201) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:61) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:28) > at = com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftCon > nection.execute(ThriftSyncConnectionFactoryImpl.java:151) > at = com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverI > mpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69) > at = com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnect > = ionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:253) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4.execute(Thr > iftColumnFamilyQueryImpl.java:535) > at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass > andraMapState.java:219) > at = storm.trident.state.map.CachedMap.multiGet(CachedMap.java:35) > at = storm.trident.state.map.TransactionalMap.multiUpdate(TransactionalMap > .java:38) > at = storm.trident.state.map.CachedBatchReadsMap.multiUpdate(CachedBatchRe > adsMap.java:35) > at = storm.trident.state.map.SnapshottableMap.multiUpdate(SnapshottableMap > .java:25) > at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom > binerAggStateUpdater.java:47) > at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom > binerAggStateUpdater.java:18) > at = storm.trident.planner.processor.PartitionPersistProcessor.finishBatch > (PartitionPersistProcessor.java:81) > at = storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.jav > a:135) > at = storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExe > cutor.java:235) > at = storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExe > cutor.java:268) > at = storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecuto > r.java:342) > at = backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke( > executor.clj:566) > at = backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(execu > tor.clj:345) > at = backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disrupto > r.clj:43) > at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue > ue.java:84) > at = backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupt > orQueue.java:58) > at = backtype.storm.disruptor$consume_batch_when_available.invoke(disrupto > r.clj:62) > at = backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(exec > utor.clj:658) > at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > at clojure.lang.AFn.run(AFn.java:24) > at java.lang.Thread.run(Unknown Source) > Caused by: java.lang.NullPointerException > at = com.google.common.base.Preconditions.checkNotNull(Preconditions.java: > 192) > at = com.google.common.collect.ImmutableClassToInstanceMap.getInstance(Imm > utableClassToInstanceMap.java:147) > at = com.netflix.astyanax.model.AbstractComposite.serializerForComparator( > AbstractComposite.java:301) > at = com.netflix.astyanax.model.AbstractComposite.getSerializer(AbstractCo > mposite.java:323) > at = com.netflix.astyanax.model.AbstractComposite.deserialize(AbstractComp > osite.java:680) > at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C > ompositeSerializer.java:26) > at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C > ompositeSerializer.java:8) > at = com.netflix.astyanax.thrift.model.ThriftRowsListImpl.(ThriftRow > sListImpl.java:42) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE > xecute(ThriftColumnFamilyQueryImpl.java:549) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE > xecute(ThriftColumnFamilyQueryImpl.java:538) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:56) > ... 27 more > 4969 [Thread-21] ERROR backtype.storm.util - Async loop died! > java.lang.RuntimeException: java.lang.RuntimeException: = com.netflix.astyanax.con > nectionpool.exceptions.UnknownException: UnknownException: = [host=3D10.10.6.80(10.1 > 0.6.80):9160, latency=3D284(284), = attempts=3D1]java.lang.NullPointerException > at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue > ue.java:87) > at = backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupt > orQueue.java:58) > at = backtype.storm.disruptor$consume_batch_when_available.invoke(disrupto > r.clj:62) > at = backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(exec > utor.clj:658) > at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > at clojure.lang.AFn.run(AFn.java:24) > at java.lang.Thread.run(Unknown Source) > Caused by: java.lang.RuntimeException: = com.netflix.astyanax.connectionpool.excep > tions.UnknownException: UnknownException: = [host=3D10.10.6.80(10.10.6.80):9160, lat > ency=3D284(284), attempts=3D1]java.lang.NullPointerException > at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass > andraMapState.java:222) > at = storm.trident.state.map.CachedMap.multiGet(CachedMap.java:35) > at = storm.trident.state.map.TransactionalMap.multiUpdate(TransactionalMap > .java:38) > at = storm.trident.state.map.CachedBatchReadsMap.multiUpdate(CachedBatchRe > adsMap.java:35) > at = storm.trident.state.map.SnapshottableMap.multiUpdate(SnapshottableMap > .java:25) > at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom > binerAggStateUpdater.java:47) > at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom > binerAggStateUpdater.java:18) > at = storm.trident.planner.processor.PartitionPersistProcessor.finishBatch > (PartitionPersistProcessor.java:81) > at = storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.jav > a:135) > at = storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExe > cutor.java:235) > at = storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExe > cutor.java:268) > at = storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecuto > r.java:342) > at = backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke( > executor.clj:566) > at = backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(execu > tor.clj:345) > at = backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disrupto > r.clj:43) > at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue > ue.java:84) > ... 6 more > Caused by: = com.netflix.astyanax.connectionpool.exceptions.UnknownException: Unkn > ownException: [host=3D10.10.6.80(10.10.6.80):9160, latency=3D284(284), = attempts=3D1]ja > va.lang.NullPointerException > at = com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException > (ThriftConverter.java:201) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:61) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:28) > at = com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftCon > nection.execute(ThriftSyncConnectionFactoryImpl.java:151) > at = com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverI > mpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69) > at = com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnect > = ionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:253) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4.execute(Thr > iftColumnFamilyQueryImpl.java:535) > at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass > andraMapState.java:219) > ... 21 more > Caused by: java.lang.NullPointerException > at = com.google.common.base.Preconditions.checkNotNull(Preconditions.java: > 192) > at = com.google.common.collect.ImmutableClassToInstanceMap.getInstance(Imm > utableClassToInstanceMap.java:147) > at = com.netflix.astyanax.model.AbstractComposite.serializerForComparator( > AbstractComposite.java:301) > at = com.netflix.astyanax.model.AbstractComposite.getSerializer(AbstractCo > mposite.java:323) > at = com.netflix.astyanax.model.AbstractComposite.deserialize(AbstractComp > osite.java:680) > at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C > ompositeSerializer.java:26) > at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C > ompositeSerializer.java:8) > at = com.netflix.astyanax.thrift.model.ThriftRowsListImpl.(ThriftRow > sListImpl.java:42) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE > xecute(ThriftColumnFamilyQueryImpl.java:549) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE > xecute(ThriftColumnFamilyQueryImpl.java:538) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:56) > ... 27 more > 4985 [Thread-21] ERROR backtype.storm.daemon.executor - > java.lang.RuntimeException: java.lang.RuntimeException: = com.netflix.astyanax.con > nectionpool.exceptions.UnknownException: UnknownException: = [host=3D10.10.6.80(10.1 > 0.6.80):9160, latency=3D284(284), = attempts=3D1]java.lang.NullPointerException > at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue > ue.java:87) > at = backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupt > orQueue.java:58) > at = backtype.storm.disruptor$consume_batch_when_available.invoke(disrupto > r.clj:62) > at = backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(exec > utor.clj:658) > at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377) > at clojure.lang.AFn.run(AFn.java:24) > at java.lang.Thread.run(Unknown Source) > Caused by: java.lang.RuntimeException: = com.netflix.astyanax.connectionpool.excep > tions.UnknownException: UnknownException: = [host=3D10.10.6.80(10.10.6.80):9160, lat > ency=3D284(284), attempts=3D1]java.lang.NullPointerException > at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass > andraMapState.java:222) > at = storm.trident.state.map.CachedMap.multiGet(CachedMap.java:35) > at = storm.trident.state.map.TransactionalMap.multiUpdate(TransactionalMap > .java:38) > at = storm.trident.state.map.CachedBatchReadsMap.multiUpdate(CachedBatchRe > adsMap.java:35) > at = storm.trident.state.map.SnapshottableMap.multiUpdate(SnapshottableMap > .java:25) > at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom > binerAggStateUpdater.java:47) > at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom > binerAggStateUpdater.java:18) > at = storm.trident.planner.processor.PartitionPersistProcessor.finishBatch > (PartitionPersistProcessor.java:81) > at = storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.jav > a:135) > at = storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExe > cutor.java:235) > at = storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExe > cutor.java:268) > at = storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecuto > r.java:342) > at = backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke( > executor.clj:566) > at = backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(execu > tor.clj:345) > at = backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disrupto > r.clj:43) > at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue > ue.java:84) > ... 6 more > Caused by: = com.netflix.astyanax.connectionpool.exceptions.UnknownException: Unkn > ownException: [host=3D10.10.6.80(10.10.6.80):9160, latency=3D284(284), = attempts=3D1]ja > va.lang.NullPointerException > at = com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException > (ThriftConverter.java:201) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:61) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:28) > at = com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftCon > nection.execute(ThriftSyncConnectionFactoryImpl.java:151) > at = com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverI > mpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69) > at = com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnect > = ionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:253) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4.execute(Thr > iftColumnFamilyQueryImpl.java:535) > at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass > andraMapState.java:219) > ... 21 more > Caused by: java.lang.NullPointerException > at = com.google.common.base.Preconditions.checkNotNull(Preconditions.java: > 192) > at = com.google.common.collect.ImmutableClassToInstanceMap.getInstance(Imm > utableClassToInstanceMap.java:147) > at = com.netflix.astyanax.model.AbstractComposite.serializerForComparator( > AbstractComposite.java:301) > at = com.netflix.astyanax.model.AbstractComposite.getSerializer(AbstractCo > mposite.java:323) > at = com.netflix.astyanax.model.AbstractComposite.deserialize(AbstractComp > osite.java:680) > at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C > ompositeSerializer.java:26) > at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C > ompositeSerializer.java:8) > at = com.netflix.astyanax.thrift.model.ThriftRowsListImpl.(ThriftRow > sListImpl.java:42) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE > xecute(ThriftColumnFamilyQueryImpl.java:549) > at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE > xecute(ThriftColumnFamilyQueryImpl.java:538) > at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe > rationImpl.java:56) > ... 27 more > 5332 [Thread-21] INFO backtype.storm.util - Halting process: = ("Worker died") > =20 > =20 > From: P. Taylor Goetz [mailto:ptgoetz@gmail.com]=20 > Sent: February-06-14 8:23 PM > To: user@storm.incubator.apache.org > Subject: Re: Svend's blog - several questions > =20 > Adrian, > =20 > Can you provide a full stack trace? There=92s not enough there to see = where the NPE is originating. > =20 > - Taylor > =20 > =20 > On Feb 6, 2014, at 8:08 PM, Adrian Mocanu = wrote: >=20 >=20 >=20 > Thanks for pointing it out! > That test file is extremely helpful!=20 > =20 > Hi all, > I=92m using storm-cassandra (mentioned in this thread). I keep = getting NPE: > =20 > 10241 [Thread-21] ERROR = com.netflix.astyanax.connectionpool.impl.CountingConnect > ionPoolMonitor - = com.netflix.astyanax.connectionpool.exceptions.UnknownExceptio > n: UnknownException: [host=3D10.10.6.80(10.10.6.80):9160, = latency=3D23(24), attempts > =3D1]java.lang.NullPointerException > com.netflix.astyanax.connectionpool.exceptions.UnknownException: = UnknownExceptio > n: [host=3D10.10.6.80(10.10.6.80):9160, latency=3D23(24), = attempts=3D1]java.lang.NullP > ointerException > =20 > The code I use is the following (for local not cluster run): > =20 > val options =3D new = chat.CassandraMapState.Options[TransactionalValue[_]](); > options.columnFamily =3D "transactional"; > val clusterContext =3D = chat.AstyanaxUtil.newClusterContext("10.10.6.80:9160"); > chat.AstyanaxUtil.createColumnFamily(clusterContext, "test", = "transactional", "UTF8Type", "UTF8Type", "UTF8Type"); > =20 > val cassandraStateFactory:StateFactory =3D = CassandraMapState.transactional(options) > =20 > val spout =3D new FixedBatchSpout(new Fields("sentence"), 3, > new Values("the cow jumped over the moon"), > new Values("the man went to the store and bought some candy"), > new Values("four score and seven years ago"), > new Values("how many apples can you eat")) > spout.setCycle(true) > =20 > val wordCounts :TridentState=3D tridentBuilder.newStream("spout1", = spout) > .each(new Fields("sentence"), new Split(), new Fields("word")) > .groupBy(new Fields("word")) > .persistentAggregate(cassandraStateFactory, new Count(), new = Fields("count")) > =20 > val cluster =3D new LocalCluster(); > val config =3D new Config(); > val clientConfig =3D new util.HashMap[String, Object](); > clientConfig.put(StormCassandraConstants.CASSANDRA_HOST, = "10.10.6.80:9160"); > clientConfig.put(StormCassandraConstants.CASSANDRA_STATE_KEYSPACE, = "test"); > config.put("cassandra.config", clientConfig); //must match the = KEY from Options.clientConfigKey > config.setMaxSpoutPending(100); > config.setMaxSpoutPending(25); > cluster.submitTopology("test", config, tridentBuilder.build()); > =20 > This code creates keyspace test and column family transactional with a = few other things. > So connection to Cassandra is fine. > The code that throws the can exception is in multiGet method: > RowSliceQuery query =3D = this.keyspace.prepareQuery(cf).getKeySlice(keyNames); > Rows result =3D null; > try {=20 > result =3D query.execute().getResult(); = <---------this gives NPE > } catch (ConnectionException e) { > // TODO throw a specific error. > throw new RuntimeException(e); <----caught = here > } > =20 > The keys passed to multiGet are: [[moon], [bought], [the], [some], = [score], [cow > ], [went], [and], [to], [seven], [over], [store], [years], [jumped], = [candy], [f > our], [ago], [man]] > =20 > Is this exception thrown because of schema mismatch? IF so, what = column names/schema am I supposed to use to run this example code. > =20 > Thanks so much.=20 > I=92m getting close!! > =20 > -A > From: P. Taylor Goetz [mailto:ptgoetz@gmail.com]=20 > Sent: February-06-14 2:10 PM > To: user@storm.incubator.apache.org > Subject: Re: Svend's blog - several questions > =20 > It=92s in maven central: > =20 > com.hmsonline > storm-cassandra > 0.4.0-rc4 > - Taylor > =20 > =20 > On Feb 6, 2014, at 2:05 PM, Adrian Mocanu = wrote: >=20 >=20 >=20 >=20 > Hi Taylor, > I will give this a try. What is the maven repository for it? > =20 > I=92ve found several ones: > "com.hmsonline" % "hms-cassandra-rest" % "1.0.0" > "com.github.ptgoetz" % "storm-cassandra" % "0.1.2" > =20 > And now > "com.hmsonline" % "storm-cassandra" % "0.4.0-rc4" > from > http://mvnrepository.com/artifact/com.hmsonline/storm-cassandra > =20 > -A > From: P. Taylor Goetz [mailto:ptgoetz@gmail.com]=20 > Sent: February-06-14 11:31 AM > To: user@storm.incubator.apache.org > Subject: Re: Svend's blog - several questions > =20 > Thanks Svend. Good explanation. > =20 > Adrian, > =20 > The storm-cassandra documentation could be better in terms of = explaining how to use the MapState implementation, but theres a unit = test that demonstrates basic usage: > =20 > = https://github.com/hmsonline/storm-cassandra/blob/master/src/test/java/com= /hmsonline/storm/cassandra/bolt/CassandraMapStateTest.java > =20 > Basically, you just need to point it to a keyspace + column family = where the state data will be stored. > =20 > - Taylor > =20 > On Feb 6, 2014, at 3:25 AM, Svend Vanderveken = wrote: >=20 >=20 >=20 >=20 >=20 > =20 > The logic of a map state is to keep a "state" somewhere, you can think = of a Storm state as a big Map of key values, the keys come from the = groupBy and the values are the result of the aggregations. Conceptually, = when your topology is talking to a State, you can imagine it's actually = talking to a big HashMap (only there's a DB behind for persistence + = opaque logic for error handling).=20 > =20 > Most of the time, I try not to have any other part of my product that = actually depends on the location or structure the data is stored in DB, = so I do not really need to be super specific about the storage stucture: = that is up to the IBackingMap implementation I am delegating to. Read or = write access to the DB is done via the Storm primitive, not by accessing = the DB directly. Don't forget there's also the stateQuery primitive you = can use to read you stored state from another place.=20 > =20 > There are ways to configure column families and column names, have a = look at the super clear storm-cassandra doc to see how to do that with = this implementation:https://github.com/hmsonline/storm-cassandra > =20 > My blog post of last year is indeed illustrating a full implementation = including an in-house IBackingMap implementation, I think that approach = is sometimes needed when we want fine grained control over things. I = should have made more clear that this is not necessarily the default = approach to have. > =20 > =20 > I hope this makes sense now.=20 > =20 > S=20 > =20 > =20 > =20 > =20 > =20 > =20 > =20 > =20 > =20 > =20 > =20 >=20 > On Wed, Feb 5, 2014 at 11:15 PM, Adrian Mocanu = wrote: > Thank you Svend and Adam. > =20 > Svend I=92m your reader and that tutorial is very useful. I=92ve been = spending lots of time looking at the code and that blog post. > =20 > BTW I initially thought you were adding the nulls incorrectly in Q3 = below, but now I see you=92re doing it correctly. > =20 > I have a follow up question: > Why do you say that =93we do not implement multiget/multiput, we just = take an existing implementation for Cassandra or Memcached or anything = and they do what's right for that backend.=94=20 > I thought that I had to rewrite an IBackingMap implementation to = correspond to the tuples and schema I have in my database. I use = Cassandra. > I started with com.hmsonline.storm.cassandra.trident.CassandraState or = trident.cassandra.CassandraState (they both implement IBackingMap) and I = replaced multiGet and multiPut to match my db schema. (well, I=92m = trying to do it) > =20 > You are saying I can use CassandraState as it is? :D > If so how would it even know what table my data should go into? It = allows you to set the column family and a few other things where state = will be saved (keyspace, column family, replication, rowKey). By state I = think it means sth like txID (transaction ID). Do you by any chance know = what this state that CassandraState is saving is? > So as you can tell I have no idea how to use CassandraState. > =20 > Thanks again! > -A > From: Svend Vanderveken [mailto:svend.vanderveken@gmail.com]=20 > Sent: February-05-14 2:56 PM >=20 > To: user@storm.incubator.apache.org > Subject: Re: Svend's blog - several questions > =20 > =20 > =20 > =20 >=20 > On Wed, Feb 5, 2014 at 6:22 PM, Adrian Mocanu = wrote: > I've read Svend's blog = [http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state= -update-with-storm/] multiple times and I have a few questions. > =20 > =20 > So you are my reader! Great :D > (you can post your questions on the blog itself, I'm more likely to = spot it there) > =20 > =20 > =20 > "Because we did a groupBy on one tuple field, each List contains here = one single > String: the correlationId. Note that the list we return must have = exactly the same > size as the list of keys, so that Storm knows what period corresponds = to what key. > So for any key that does not exist in DB, we simply put a null in the = resulting list." > =20 > Q1: Do the db keys come only from groupBy? > =20 > Yes, the key values arriving in the multiget are the field value by = which we are grouping > do groupBy (new Fields("color")) and you get things like "blue"; = "green", "flowerly romantic red"... > =20 > =20 > =20 > =20 > Q2: Can you do groupBy multiple keys:like = .groupBy("name").groupBy("id") ? > =20 > yes, the syntax is like this:=20 > =20 > groupBy (new Fields("name", "id")) > =20 > That's the reason the keys in the multiget are List and not = simply Object. We receive them in the order they specified in the = topology definition=20 > =20 > =20 > Q3: When we add null we keep the size of the results list the same as = they keys list but I don't understand how we make sure that key(3) = points to correct result(3). > After all we're adding nulls at the end of result list not = intermitently. ie: if > key(1) does not have an entry in db, and key size is 5, we add null to = last position > in results not to results(1). This doesn't preserve consistency/order = so key(1) now > gives result(1) which is not null as it should be. Is the code = incorrect ... or the > explanation on Svend's blog is incorrect? > =20 > =20 > The order should indeed be respected, so if the strategy to handling = error DB error in a multi-get is to put nulls, that they should indeed = be at index corresponding to the problematic key. Is there part of my = toy project code who is padding nulls at the end? If so that's indeed a = bug, please let me know where (or better, fork and send me a pull = request) > =20 > Note that I'm not particularly recommending to put nulls in case of = unrecoverable errors in a multi-get, that's actually a simplistic way of = handling the error. The contract with storm is either to fail either to = return a list of the correct size in the correct order. The data itself = and its semantic is up to the topology implementation, i.e. up to us. =20= > =20 > =20 > =20 > =20 > Moving on, > "Once this is loaded Storm will present the tuples having the same = correlation ID > one by one to our reducer, the PeriodBuilder" > =20 > Q4: Does Trident/Storm call the reducer after calling multiGet and = before calling multiPut? > =20 > yes=20 > =20 > Q5: What params (and their types) are passed to the reducer and what = parameters should it emit so they can go into multiGet? > =20 > the reducer is called iteratively, it starts with the state found from = DB (returned by the multiget) and the first grouped tuple, then the = second, then the third... until the last tuple. The return value of the = last call of the reducer is what is provided to the multiput, for the = same key as the multiget.=20 > =20 > "reduce" is actually a very common pattern in functional programming, = which us java programming are sometimes less aware of. Look up some = general doc on "reduce", the storm approach to it is very traditional, = i.e. Storm has defined the "reduce" primitive exactly the way many other = tools are defining that primitive > =20 > =20 > Q6: The first time the program is run the database is empty and = multiGet will return nothing. > Does the reducer need to take care and make sure to insert for the = first time as opposed to update value? I do see that reducer = (TimelineUpdater) checks for nulls and I'm guessing this is the reason = why it does so. > =20 > =20 > =20 > Exactly. > =20 > That's also why returning null in case of error in the multiget is = questionable and probably not what you would systematically do: it is = equivalent to saying: there's garbage in persistence for that key, so = let's just consider there's nothing. The actually proper thing to do = depends on the task at hand, but actually, such error in multiget is = ofter a symptom that we stored garbage in persistence in the past due to = some other, it's too late to correct it now.=20 > =20 > Last thing: most of the time we do not implement multiget/multiput, we = just take an existing implementation for Cassandra or Memcached or = anything and they do what's right for that backend.=20 > =20 > =20 > Q7: > Can someone explain what these mean: > .each (I've seen this used even consecutively: .each(..).each(..) ) > .newStream > .newValuesStream > .persistAggregate > =20 > =20 > I think they are all detailed here: = https://github.com/nathanmarz/storm/wiki/Trident-API-Overview > =20 > =20 > I am unable to find javadocs with documentation for the method = signatures. > These java docs don't help = much:http://nathanmarz.github.io/storm/doc/storm/trident/Stream.html > =20 > =20 > Q8: > Storm has ack/fail; does Trident handle that automatically? > =20 > =20 > Yes, although you can also explicitly trigger error. Look up my next = blog: error handling in Storm Trident.=20 > =20 > =20 > =20 > =20 > Q9: Has anyone tried Spark? = http://spark.incubator.apache.org/streaming/ > I'm wondering if anyone has tried it because I'm thinking of ditching = storm and moving to that. > It seems much much much better documented. > =20 > =20 > Spark looks cool I've not played with it yet, no. Go ahead, keep us = posted what you find out! > =20 > =20 > =20 > Lots of questions I know. Thanks for reading! > =20 > =20 > and you :D > =20 > =20 > -Adrian > =20 > =20 > =20 > Svend --Apple-Mail=_013D8318-E196-4751-AC46-12B746551839 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=windows-1252 Hey = Ardrian,

I ran the CassandraMapStateTest against = Cassandra 1.2.x and 2.0.x and was unable to reproduce what you=92re = seeing. The difference is that I=92m not using Scala, which may or may = not have something to do with it.

Can you try = running the CassandraMapStateTest against your cassandra = instance?

To prevent the test from starting an = in-process cassandra server, comment out the following = line:

SingletonEmbeddedCassandra.getInstance();

and change the host:port combos to match your = environment if you=92re not running cassandra on = localhost.

Sorry I can=92t be of more help. Is = there anything in your logs leading up to the NPE that might indicate = some other problem?

- = Taylor

On Feb 7, 2014, at 10:44 AM, Adrian = Mocanu <amocanu@verticalscope.com>= ; wrote:

It=92s Cassandra 2.0
 
From: P. Taylor Goetz [mailto:ptgoetz@gmail.com] 
Sent: February-07-14 10:11 = AM
To: user@storm.incubator.apach= e.org
Subject: Re: Svend's blog - several = questions
 
Adrian,
 
Quick = question=85 What version of Cassandra are you running this = against?
 
- = Taylor
 
On = Feb 7, 2014, at 9:29 AM, Adrian Mocanu <amocanu@verticalscope.com> = wrote:


Hi Taylor,
Thanks for your = reply.
I highlighted the line where the NPE occurs, but I = can certainly also give a full stack trace=85 it=92s a bit = lengthy.
I=92m still not sure what schema the example code I = posted expects to see in the database=85 I think that=92s the reason for = it but it=92s only speculation at this = point.
 
3686 [Thread-21] = INFO  = com.netflix.astyanax.connectionpool.impl.CountingConnecti
onPoolMonitor  - AddHost: = 10.10.6.80
4408 [Thread-21] INFO  = backtype.storm.daemon.executor  - Prepared bolt = b-0:(4)
4933 [Thread-21] ERROR = com.netflix.astyanax.connectionpool.impl.CountingConnecti
onPoolMonitor  - = com.netflix.astyanax.connectionpool.exceptions.UnknownException
: UnknownException: = [host=3D10.10.6.80(10.10.6.80):9160, latency=3D284(284), = attempt
s=3D1]java.lang.NullPointerException
<= div>
com.netflix.astyanax.connectionpool.exceptions.UnknownException: = UnknownExceptio
n: = [host=3D10.10.6.80(10.10.6.80):9160, latency=3D284(284), = attempts=3D1]java.lang.Nul
lPointerException
        at = com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException
(ThriftConverter.java:201)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:61)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:28)
        at = com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftCon
nection.execute(ThriftSyncConnectionFactoryImpl.java:151)
        at = com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverI
mpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69)
        at = com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnect
ionPool.executeWithFailover(AbstractHostPartitionConnectionPool.jav= a:253)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4.execute(Thr
iftColumnFamilyQueryImpl.java:535)
        at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass
andraMapState.java:219)
        at = storm.trident.state.map.CachedMap.multiGet(CachedMap.java:35)<= /o:p>
        at = storm.trident.state.map.TransactionalMap.multiUpdate(TransactionalMap
.java:38)
        at = storm.trident.state.map.CachedBatchReadsMap.multiUpdate(CachedBatchRe
adsMap.java:35)
        at = storm.trident.state.map.SnapshottableMap.multiUpdate(SnapshottableMap
.java:25)
        at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom
binerAggStateUpdater.java:47)
        at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom
binerAggStateUpdater.java:18)
        at = storm.trident.planner.processor.PartitionPersistProcessor.finishBatch
(PartitionPersistProcessor.java:81)
        at = storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.jav
a:135)
        at = storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExe
cutor.java:235)
        at = storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExe
cutor.java:268)
        at = storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecuto
r.java:342)
        at = backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(
executor.clj:566)
        at = backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(execu
tor.clj:345)
        at = backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disrupto
r.clj:43)
        at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue
ue.java:84)
        at = backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupt
orQueue.java:58)
        at = backtype.storm.disruptor$consume_batch_when_available.invoke(disrupto
r.clj:62)
        at = backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(exec
utor.clj:658)
        at = backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
        at = clojure.lang.AFn.run(AFn.java:24)
        at = java.lang.Thread.run(Unknown = Source)
Caused by: = java.lang.NullPointerException
        at = com.google.common.base.Preconditions.checkNotNull(Preconditions.java:
192)
        at = com.google.common.collect.ImmutableClassToInstanceMap.getInstance(Imm
utableClassToInstanceMap.java:147)
        at = com.netflix.astyanax.model.AbstractComposite.serializerForComparator(
AbstractComposite.java:301)
        at = com.netflix.astyanax.model.AbstractComposite.getSerializer(AbstractCo
mposite.java:323)
        at = com.netflix.astyanax.model.AbstractComposite.deserialize(AbstractComp
osite.java:680)
        at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C
ompositeSerializer.java:26)
        at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C
ompositeSerializer.java:8)
        at = com.netflix.astyanax.thrift.model.ThriftRowsListImpl.<init>(ThriftRo= w
sListImpl.java:42)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE
xecute(ThriftColumnFamilyQueryImpl.java:549)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE
xecute(ThriftColumnFamilyQueryImpl.java:538)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:56)
        ... 27 = more
4969 [Thread-21] ERROR backtype.storm.util  - = Async loop died!
java.lang.RuntimeException: = java.lang.RuntimeException: = com.netflix.astyanax.con
nectionpool.exceptions.UnknownException: UnknownException: = [host=3D10.10.6.80(10.1
0.6.80):9160, = latency=3D284(284), = attempts=3D1]java.lang.NullPointerException
<= div>
        at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue
ue.java:87)
        at = backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupt
orQueue.java:58)
        at = backtype.storm.disruptor$consume_batch_when_available.invoke(disrupto
r.clj:62)
        at = backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(exec
utor.clj:658)
        at = backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
        at = clojure.lang.AFn.run(AFn.java:24)
        at = java.lang.Thread.run(Unknown = Source)
Caused by: java.lang.RuntimeException: = com.netflix.astyanax.connectionpool.excep
tions.UnknownException: = UnknownException: [host=3D10.10.6.80(10.10.6.80):9160, = lat
ency=3D284(284), = attempts=3D1]java.lang.NullPointerException
<= div>
        at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass
andraMapState.java:222)
        at = storm.trident.state.map.CachedMap.multiGet(CachedMap.java:35)<= /o:p>
        at = storm.trident.state.map.TransactionalMap.multiUpdate(TransactionalMap
.java:38)
        at = storm.trident.state.map.CachedBatchReadsMap.multiUpdate(CachedBatchRe
adsMap.java:35)
        at = storm.trident.state.map.SnapshottableMap.multiUpdate(SnapshottableMap
.java:25)
        at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom
binerAggStateUpdater.java:47)
        at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom
binerAggStateUpdater.java:18)
        at = storm.trident.planner.processor.PartitionPersistProcessor.finishBatch
(PartitionPersistProcessor.java:81)
        at = storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.jav
a:135)
        at = storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExe
cutor.java:235)
        at = storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExe
cutor.java:268)
        at = storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecuto
r.java:342)
        at = backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(
executor.clj:566)
        at = backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(execu
tor.clj:345)
        at = backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disrupto
r.clj:43)
        at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue
ue.java:84)
        ... 6 = more
Caused by: = com.netflix.astyanax.connectionpool.exceptions.UnknownException: = Unkn
ownException: [host=3D10.10.6.80(10.10.6.80):9160, = latency=3D284(284), attempts=3D1]ja
va.lang.NullPointerException
        at = com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException
(ThriftConverter.java:201)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:61)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:28)
        at = com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftCon
nection.execute(ThriftSyncConnectionFactoryImpl.java:151)
        at = com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverI
mpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69)
        at = com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnect
ionPool.executeWithFailover(AbstractHostPartitionConnectionPool.jav= a:253)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4.execute(Thr
iftColumnFamilyQueryImpl.java:535)
        at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass
andraMapState.java:219)
        ... 21 = more
Caused by: = java.lang.NullPointerException
        at = com.google.common.base.Preconditions.checkNotNull(Preconditions.java:
192)
        at = com.google.common.collect.ImmutableClassToInstanceMap.getInstance(Imm
utableClassToInstanceMap.java:147)
        at = com.netflix.astyanax.model.AbstractComposite.serializerForComparator(
AbstractComposite.java:301)
        at = com.netflix.astyanax.model.AbstractComposite.getSerializer(AbstractCo
mposite.java:323)
        at = com.netflix.astyanax.model.AbstractComposite.deserialize(AbstractComp
osite.java:680)
        at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C
ompositeSerializer.java:26)
        at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C
ompositeSerializer.java:8)
        at = com.netflix.astyanax.thrift.model.ThriftRowsListImpl.<init>(ThriftRo= w
sListImpl.java:42)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE
xecute(ThriftColumnFamilyQueryImpl.java:549)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE
xecute(ThriftColumnFamilyQueryImpl.java:538)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:56)
        ... 27 = more
4985 [Thread-21] ERROR = backtype.storm.daemon.executor  = -
java.lang.RuntimeException: = java.lang.RuntimeException: = com.netflix.astyanax.con
nectionpool.exceptions.UnknownException: UnknownException: = [host=3D10.10.6.80(10.1
0.6.80):9160, = latency=3D284(284), = attempts=3D1]java.lang.NullPointerException
<= div>
  =       at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue
ue.java:87)
        at = backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(Disrupt
orQueue.java:58)
        at = backtype.storm.disruptor$consume_batch_when_available.invoke(disrupto
r.clj:62)
        at = backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(exec
utor.clj:658)
        at = backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
        at = clojure.lang.AFn.run(AFn.java:24)
        at = java.lang.Thread.run(Unknown = Source)
Caused by: java.lang.RuntimeException: = com.netflix.astyanax.connectionpool.excep
tions.UnknownException: = UnknownException: [host=3D10.10.6.80(10.10.6.80):9160, = lat
ency=3D284(284), = attempts=3D1]java.lang.NullPointerException
<= div>
        at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass<= /span>
andraMapState.java:222)
        at = storm.trident.state.map.CachedMap.multiGet(CachedMap.java:35)<= /o:p>
        at = storm.trident.state.map.TransactionalMap.multiUpdate(TransactionalMap
.java:38)
        at = storm.trident.state.map.CachedBatchReadsMap.multiUpdate(CachedBatchRe
adsMap.java:35)
        at = storm.trident.state.map.SnapshottableMap.multiUpdate(SnapshottableMap
.java:25)
        at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom
binerAggStateUpdater.java:47)
        at = storm.trident.state.map.MapCombinerAggStateUpdater.updateState(MapCom
binerAggStateUpdater.java:18)
        at = storm.trident.planner.processor.PartitionPersistProcessor.finishBatch
(PartitionPersistProcessor.java:81)
        at = storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.jav
a:135)
        at = storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExe
cutor.java:235)
        at = storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExe
cutor.java:268)
        at = storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecuto
r.java:342)
        at = backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(
executor.clj:566)
        at = backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(execu
tor.clj:345)
        at = backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disrupto
r.clj:43)
        at = backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQue
ue.java:84)
        ... 6 = more
Caused by: = com.netflix.astyanax.connectionpool.exceptions.UnknownException: = Unkn
ownException: [host=3D10.10.6.80(10.10.6.80):9160, = latency=3D284(284), attempts=3D1]ja
va.lang.NullPointerException
        at = com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException
(ThriftConverter.java:201)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:61)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:28)
        at = com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftCon
nection.execute(ThriftSyncConnectionFactoryImpl.java:151)
        at = com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverI
mpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69)
        at = com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnect
ionPool.executeWithFailover(AbstractHostPartitionConnectionPool.jav= a:253)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4.execute(Thr
iftColumnFamilyQueryImpl.java:535)
        at = com.hmsonline.storm.cassandra.trident.CassandraMapState.multiGet(Cass
andraMapState.java:219)
        ... 21 = more
Caused by: = java.lang.NullPointerException
        at = com.google.common.base.Preconditions.checkNotNull(Preconditions.java:
192)
        at = com.google.common.collect.ImmutableClassToInstanceMap.getInstance(Imm
utableClassToInstanceMap.java:147)
        at = com.netflix.astyanax.model.AbstractComposite.serializerForComparator(
AbstractComposite.java:301)
        at = com.netflix.astyanax.model.AbstractComposite.getSerializer(AbstractCo
mposite.java:323)
        at = com.netflix.astyanax.model.AbstractComposite.deserialize(AbstractComp
osite.java:680)
        at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C
ompositeSerializer.java:26)
        at = com.netflix.astyanax.serializers.CompositeSerializer.fromByteBuffer(C
ompositeSerializer.java:8)
        at = com.netflix.astyanax.thrift.model.ThriftRowsListImpl.<init>(ThriftRo= w
sListImpl.java:42)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE
xecute(ThriftColumnFamilyQueryImpl.java:549)
        at = com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$4$1.internalE
xecute(ThriftColumnFamilyQueryImpl.java:538)
        at = com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOpe
rationImpl.java:56)
        ... 27 = more
5332 [Thread-21] INFO  backtype.storm.util  = - Halting process: ("Worker = died")
 
 
From: P. Taylor Goetz [mailto:ptgoetz@gmail.com] 
Sent: February-06-14 8:23 = PM
To: user@storm.incubator.apache.org
Subject: Re: Svend's blog - several = questions
 
Adrian,
 
Can = you provide a full stack trace? There=92s not enough there to see where = the NPE is originating.
 
- = Taylor
 
 
On Feb 6, 2014, at 8:08 PM, Adrian Mocanu <amocanu@verticalscope.com> = wrote:



Thanks for pointing it = out!
That test file is extremely helpful! =
 
Hi all,
I=92m using  storm-cassandra (mentioned in this =
thread). I keep getting NPE:
 
10241 [Thread-21] ERROR =
com.netflix.astyanax.connectionpool.impl.CountingConnect=
ionPoolMonitor  - =
com.netflix.astyanax.connectionpool.exceptions.UnknownExceptio=
n: =
UnknownException: [host=3D10.10.6.80(10.10.6.80):9160, latency=3D23(24), =
attempts
=3D1]java.lang.NullPointerException
com.netflix.astyanax.connectionpool.exceptions.UnknownException: =
UnknownExceptio
n: [host=3D10.10.6.80(10.10.6.80):9160, =
latency=3D23(24), attempts=3D1]java.lang.NullP
ointerException
 
The code I use is the following (for local not =
cluster run):
 
val options =3D new =
chat.CassandraMapState.Options[TransactionalValue[_]]();=
        options.columnFamily =3D=
 "transactional";
      val clusterContext =3D =
chat.AstyanaxUtil.newClusterContext("10.10.6.80:9160");<=
/pre>
      =
chat.AstyanaxUtil.createColumnFamily(clusterContext, "test", =
"transactional", "UTF8Type", "UTF8Type", =
"UTF8Type");
 
val cassandraStateFactory:StateFactory =3D =
CassandraMapState.transactional(options)
 
    val spout =3D =
new FixedBatchSpout(new Fields("sentence"), =
3,
      new Values("the cow jumped over =
the moon"),
      new Values("the man =
went to the store and bought some candy"),
      new =
Values("four score and seven years ago"),
      new =
Values("how many apples can you eat"))
    =
spout.setCycle(true)
 
    val wordCounts :TridentState=3D =
tridentBuilder.newStream("spout1", spout)
      =
.each(new Fields("sentence"), new Split(), new =
Fields("word"))
      .groupBy(new =
Fields("word"))
      =
.persistentAggregate(cassandraStateFactory, new Count(), new =
Fields("count"))
 
    val cluster =3D new =
LocalCluster();
    val config =3D new =
Config();
    val clientConfig =3D new util.HashMap[String, =
Object]();
    =
clientConfig.put(StormCassandraConstants.CASSANDRA_HOST, =
"10.10.6.80:9160");
    =
clientConfig.put(StormCassandraConstants.CASSANDRA_STATE_KEYSPACE, =
"test");
    config.put("cassandra.config", =
clientConfig);  //must match the KEY from =
Options.clientConfigKey
    =
config.setMaxSpoutPending(100);
    =
config.setMaxSpoutPending(25);
    =
cluster.submitTopology("test", config, =
tridentBuilder.build());
 
This code creates keyspace test and column family =
transactional with a few other things.
So connection to Cassandra is =
fine.
The =
code that throws the can exception is in multiGet =
method:
        =
RowSliceQuery<Composite, String> query =3D =
this.keyspace.prepareQuery(cf).getKeySlice(keyNames);
        Rows<Composite, =
String> result =3D null;
        try { =
           &=
nbsp;result =3D query.execute().getResult();  =
       <---------this gives =
NPE
        } catch =
(ConnectionException e) {
            =
// TODO throw a specific error.
            =
throw new =
RuntimeException(e);         =
    <----caught here
        =
}
 
The keys passed to multiGet are: [[moon], [bought], =
[the], [some], [score], [cow
], [went], [and], [to], [seven], [over], [store], =
[years], [jumped], [candy], [f
our], [ago], =
[man]]
 
Is this exception thrown because of schema mismatch? =
IF so, what column names/schema am I supposed to use to run this example =
code.
 
Thanks so much. 
I=92m getting =
close!!
 
-A
From: P. Taylor Goetz [mailto:ptgoetz@gmail.com] 
Sent: February-06-14 2:10 = PM
To: user@storm.incubator.apache.org
Subject: 
Re: Svend's blog - = several questions
 
It=92s in maven = central:
 
         <groupId>com.hmsonline</groupId>
         <artifactId>storm-cassandra</artifactId>
         <version>0.4.0-rc4</version>
- =
Taylor
 
 
On Feb 6, 2014, at 2:05 PM, Adrian Mocanu <amocanu@verticalscope.com> = wrote:




Hi = Taylor,
I will give this a try. What is the maven repository = for it?
 
I=92ve found several = ones:
"com.hmsonline" % "hms-cassandra-rest" % = "1.0.0"
"com.github.ptgoetz" % "storm-cassandra" % = "0.1.2"
 
And = now
"com.hmsonline" % "storm-cassandra" % = "0.4.0-rc4"
from
 
-A
From: P. Taylor Goetz [mailto:ptgoetz@gmail.com] 
Sent: February-06-14 11:31 = AM
To: 
user@storm.incubator.apache.org
Subject: Re: Svend's blog - several = questions
 
Thanks Svend. Good explanation.
 
Adrian,
 
The storm-cassandra documentation could be better in = terms of explaining how to use the MapState implementation, but theres a = unit test that demonstrates basic usage:
 
 
Basically, you just need to point it to a keyspace + column = family where the state data will be = stored.
 
- = Taylor
 
On Feb 6, 2014, at 3:25 AM, Svend Vanderveken <svend.vanderveken@gmail.com> = wrote:





 
The logic of a map state is to keep a "state" = somewhere, you can think of a Storm state as a big Map of key values, = the keys come from the groupBy and the values are the result of the = aggregations. Conceptually, when your topology is talking to a State, = you can imagine it's actually talking to a big HashMap (only there's a = DB behind for persistence + opaque logic for error = handling). 
 
Most = of the time, I try not to have any other part of my product that = actually depends on the location or structure the data is stored in DB, = so I do not really need to be super specific about the storage stucture: = that is up to the IBackingMap implementation I am delegating to. Read or = write access to the DB is done via the Storm primitive, not by accessing = the DB directly. Don't forget there's also the stateQuery primitive you = can use to read you stored state from another = place. 
 
There = are ways to configure column families and column names, have a look at = the super clear storm-cassandra doc to see how to do that with this = implementation:https://github.com/hmsonline/storm-cassandra
 
My = blog post of last year is indeed illustrating a full implementation = including an in-house IBackingMap  implementation, I think that = approach is sometimes needed when we want fine grained control over = things. I should have made more clear that this is not necessarily the = default approach to have.
 
 
I = hope this makes sense now. 
 
 
 
 
 
 
 
 
 
 
 

 

On = Wed, Feb 5, 2014 at 11:15 PM, Adrian Mocanu <amocanu@verticalscope.com> = wrote:
Thank you = Svend and Adam.
 
Svend I=92m your reader and that = tutorial is very useful. I=92ve been spending lots of time looking at = the code and that blog post.
 
BTW I initially thought you were = adding the nulls incorrectly in Q3 below, but now I see you=92re doing = it correctly.
 
I have a follow up = question:
Why do you say that =93we = do not implement multiget/multiput, we just take an existing = implementation for Cassandra or Memcached or anything and they do what's = right for that backend.=94 
I thought that I had to = rewrite an IBackingMap implementation to correspond to the tuples and = schema I have in my database. I use = Cassandra.
I started with = com.hmsonline.storm.cassandra.trident.CassandraState or = trident.cassandra.CassandraState (they both implement IBackingMap) and I = replaced multiGet and multiPut to match my db schema. (well, I=92m = trying to do it)
 
You are saying I can use = CassandraState as it is? :D
If so how would it even = know what table my data should go into? It allows you to set the column = family and a few other things where state will be saved (keyspace, = column family, replication, rowKey). By state I think it means sth like = txID (transaction ID). Do you by any chance know what this state that = CassandraState is saving is?
So as you can tell I have = no idea how to use = CassandraState.
 
Thanks = again!
-A
From: Svend Vanderveken [mailto:svend.vanderveken@gmail.com] 
Sent: February-05-14 2:56 = PM

To: user@storm.incubator.apache.org
Subject: Re: Svend's blog - = several questions
 
 
 

 

On Wed, Feb 5, 2014 at 6:22 PM, Adrian Mocanu <amocanu@verticalscope.com> = wrote:
I've read Svend's blog [http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-ti= me-state-update-with-storm/] multiple times and I have a few = questions.
 
 
So you are my reader! Great = :D
(you can post = your questions on the blog itself, I'm more likely to spot it = there)
 
 
 
"Because we did a groupBy on one tuple field, each List contains = here one single
String: the correlationId. Note that the list we return must = have exactly the same
size as the list of keys, so that Storm knows what period = corresponds to what key.
So for any key that does not exist in DB, we simply put a null = in the resulting list."
 
Q1: = Do the db keys come only from = groupBy?
 
Yes, = the key values arriving in the multiget are the field value by which we = are grouping
do =  groupBy (new Fields("color")) and you get things like "blue"; = "green", "flowerly romantic red"...
 
 
 
 
Q2: Can you do groupBy multiple = keys:like .groupBy("name").groupBy("id") = ?
 
yes, = the syntax is like this: 
 
groupBy (new Fields("name", = "id"))
 
That's the reason the keys in the multiget are = List<Object> and not simply Object. We receive them in the order = they specified in the topology = definition 
 
 
Q3: When we add null we keep the = size of the results list the same as they keys list but I don't = understand how we make sure that key(3) points to correct = result(3).
After = all we're adding nulls at the end of result list not intermitently. ie: = if
key(1) does not = have an entry in db, and key size is 5, we add null to last = position
in = results not to results(1). This doesn't preserve consistency/order so = key(1) now
gives = result(1) which is not null as it should be. Is the code incorrect ... = or the
explanation on Svend's blog is = incorrect?
 
 
The = order should indeed be respected, so if the strategy to handling error = DB error in a multi-get is to put nulls, that they should indeed be at = index corresponding to the problematic key. Is there part of my toy = project code who is padding nulls at the end? If so that's indeed a bug, = please let me know where (or better, fork and send me a pull = request)
 
Note = that I'm not particularly recommending to put nulls in case of = unrecoverable errors in a multi-get, that's actually a simplistic way of = handling the error. The contract with storm is either to fail either to = return a list of the correct size in the correct order. The data itself = and its semantic is up to the topology implementation, i.e. up to us. =  
 
 
 
 
Moving on,
"Once this is loaded Storm will present the tuples having the = same correlation ID
one by one to our reducer, the = PeriodBuilder"
 
Q4: = Does Trident/Storm call the reducer after calling multiGet and before = calling multiPut?
 
yes 
 
Q5: What params (and their = types) are passed to the reducer and what parameters should it emit so = they can go into multiGet?
 
the reducer is called iteratively, it starts with = the state found from DB (returned by the multiget) and the first grouped = tuple, then the second, then the third... until the last tuple. The = return value of the last call of the reducer is what is provided to the = multiput, for the same key as the = multiget. 
 
"reduce" is actually a very common pattern in functional = programming, which us java programming are sometimes less aware of. Look = up some general doc on "reduce", the storm approach to it is very = traditional, i.e. Storm has defined the "reduce" primitive exactly the = way many other tools are defining that = primitive
 
 
Q6: = The first time the program is run the database is empty and multiGet = will return nothing.
Does the reducer need to take care and make sure to insert for = the first time as opposed to update value? I do see that reducer = (TimelineUpdater) checks for nulls and I'm guessing this is the reason = why it does so.
 
 
 
Exactly.
 
That's also why returning null in case of error in = the multiget is questionable and probably not what you would = systematically do: it is equivalent to saying: there's garbage in = persistence for that key, so let's just consider there's nothing. The = actually proper thing to do depends on the task at hand, but actually, = such error in multiget is ofter a symptom that we stored garbage in = persistence in the past due to some other, it's too late to correct it = now. 
 
Last = thing: most of the time we do not implement multiget/multiput, we just = take an existing implementation for Cassandra or Memcached or anything = and they do what's right for that = backend. 
 
 
Q7:
Can = someone explain what these mean:
.each  (I've seen this used even consecutively: = .each(..).each(..) )
.newStream
.newValuesStream
.persistAggregate
 
 
 
 
I am = unable to find javadocs with documentation for the method = signatures.
 
 
Q8:
Storm = has ack/fail; does Trident handle that = automatically?
 
 
Yes, although you can also explicitly trigger error. = Look up my next blog:  error handling in Storm = Trident. 
 
 
 
 
Q9: = Has anyone tried Spark? http://spark.incubator.apache.org/streaming/
I'm wondering if anyone = has tried it because I'm thinking of ditching storm and moving to = that.
It seems much = much much better documented.
 
 
Spark looks cool  I've not played with it yet, = no. Go ahead, keep us posted what you find = out!
 
 
 
Lots = of questions I know. Thanks for = reading!
 
 
and = you :D
 
 
-Adrian
 
<= div style=3D"margin: 0cm 0cm 0.0001pt; font-size: 12pt; font-family: = 'Times New Roman', serif;"> 
 
Svend

= --Apple-Mail=_013D8318-E196-4751-AC46-12B746551839-- --Apple-Mail=_9EE53F24-B053-4CFA-955A-3AF5EF48FDB4 Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename=signature.asc Content-Type: application/pgp-signature; name=signature.asc Content-Description: Message signed with OpenPGP using GPGMail -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQEcBAEBCgAGBQJS9R/lAAoJEI3gOWLoC4/9WqUH/2g45G5mV2Zwqd3b57yuZ2ON orFPIcIH6kbgfw4tN53VMZtyqJou5z4IdAkH7T0Z4/utTMch0zemwzr/PDxcYEe7 +RXQ0C3nOz7G9zEX+QgOu+SkmV9gW4I9caHO0fC/sBt2qU4RfMoKrQ3GrEy4UAi5 tob3u16fUv7TSp2HvSgpeOC5eN1ZXmFRUnmX1c8hO4yL2k3avJuHXHZbBC2CZlRD NTwvJnv2HrJ+S9Fp8YBlwlg98ba7irEsgukEmtlKBwU/DR9xCK9QuOh566h2oFNx uSU85mvqIO4sf0qeUBnwx7PPvdyMgPKonDGxx53R2qAziwnH1E7VW2tyjunNa60= =86sD -----END PGP SIGNATURE----- --Apple-Mail=_9EE53F24-B053-4CFA-955A-3AF5EF48FDB4--