storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "P. Taylor Goetz" <>
Subject Re: trident simplest aggregator into cassandra
Date Mon, 03 Feb 2014 22:54:40 GMT
The storm-cassandra project ( is actively maintained
(DISCLAIMER: I’m the original author), so I would give that one a try. It’s also aligned
with the latest version of storm. 

Also, which version of storm are you using?

- Taylor

On Feb 3, 2014, at 5:34 PM, Adrian Mocanu <> wrote:

> It’s from trident.cassandra
> libraryDependencies += "trident-cassandra" %"trident-cassandra" % "0.0.1-wip2"
> but I modified the file and removed an IMetrics parameter (it seems I have 2 interfaces
for it in my code so during runtime it would think it was the other interface which did not
have that param and crashed) Anyhow, I saw that there is another version of CassandraState
in hmsonline project (libraryDependencies += "com.hmsonline" % "storm-cassandra" % "0.4.0-rc4")
which is very different. I did not use that one – I thought that was the older version.
> Thanks
> A
> From: P. Taylor Goetz [] 
> Sent: February-03-14 5:21 PM
> To:
> Subject: Re: trident simplest aggregator into cassandra
> Which project did the CassandraState implementation come from?
> On Feb 3, 2014, at 5:09 PM, Adrian Mocanu <> wrote:
> Hi
> I'm using Trident to perform some aggregations and store the results into cassandra.
> I've looked at IBackingMap and specifically at some tutorials on trident site and I've
tried using CassandraState which I found online in some repository. After creating what I
thought were column family and keys corresponding to the code I still cannot figure out how
to run the sample topology and not crash due to some Cassandra schema error (InvalidRequestException(why:Invalid
cell for CQL3 table state. The CQL3 column component (over) does not correspond to a defined
CQL3 column).
> Here is the sample code I use:
>   val cassandraStateFactory:StateFactory = chat.CassandraState.transactional("")
>     val spout = new FixedBatchSpout(new Fields("sentence"), 3,
>       new Values("the cow jumped over the moon"),
>       new Values("the man went to the store and bought some candy"),
>       new Values("four score and seven years ago"),
>       new Values("how many apples can you eat"))
>     spout.setCycle(true)
>     val wordCounts :TridentState= tridentBuilder.newStream("spout1", spout)
>       .each(new Fields("sentence"), new Split(), new Fields("word"))
>       .groupBy(new Fields("word"))
>       .persistentAggregate(cassandraStateFactory, new Count(), new Fields("count"))
>       .parallelismHint(6)
>     val cluster = new LocalCluster();
>     val config = new Config();
>     config.setMaxSpoutPending(100);
>     config.setMaxSpoutPending(25);
>     cluster.submitTopology("test", config,;
> What is the schema needed to run this example (it also uses CassandraState)?
> thanks
> A

View raw message