The storm-cassandra project (https://github.com/hmsonline/storm-cassandra) is actively maintained
(DISCLAIMER: I’m the original author), so I would give that one a try. It’s also aligned
with the latest version of storm.
Also, which version of storm are you using?
- Taylor
On Feb 3, 2014, at 5:34 PM, Adrian Mocanu <amocanu@verticalscope.com> wrote:
> It’s from trident.cassandra
> libraryDependencies += "trident-cassandra" %"trident-cassandra" % "0.0.1-wip2"
>
> but I modified the file and removed an IMetrics parameter (it seems I have 2 interfaces
for it in my code so during runtime it would think it was the other interface which did not
have that param and crashed) Anyhow, I saw that there is another version of CassandraState
in hmsonline project (libraryDependencies += "com.hmsonline" % "storm-cassandra" % "0.4.0-rc4")
which is very different. I did not use that one – I thought that was the older version.
>
> Thanks
> A
> From: P. Taylor Goetz [mailto:ptgoetz@gmail.com]
> Sent: February-03-14 5:21 PM
> To: user@storm.incubator.apache.org
> Subject: Re: trident simplest aggregator into cassandra
>
> Which project did the CassandraState implementation come from?
>
>
> On Feb 3, 2014, at 5:09 PM, Adrian Mocanu <amocanu@verticalscope.com> wrote:
>
>
> Hi
> I'm using Trident to perform some aggregations and store the results into cassandra.
>
> I've looked at IBackingMap and specifically at some tutorials on trident site and I've
tried using CassandraState which I found online in some repository. After creating what I
thought were column family and keys corresponding to the code I still cannot figure out how
to run the sample topology and not crash due to some Cassandra schema error (InvalidRequestException(why:Invalid
cell for CQL3 table state. The CQL3 column component (over) does not correspond to a defined
CQL3 column).
>
> Here is the sample code I use:
>
> val cassandraStateFactory:StateFactory = chat.CassandraState.transactional("10.10.6.80")
>
> val spout = new FixedBatchSpout(new Fields("sentence"), 3,
> new Values("the cow jumped over the moon"),
> new Values("the man went to the store and bought some candy"),
> new Values("four score and seven years ago"),
> new Values("how many apples can you eat"))
> spout.setCycle(true)
>
> val wordCounts :TridentState= tridentBuilder.newStream("spout1", spout)
> .each(new Fields("sentence"), new Split(), new Fields("word"))
> .groupBy(new Fields("word"))
> .persistentAggregate(cassandraStateFactory, new Count(), new Fields("count"))
> .parallelismHint(6)
>
> val cluster = new LocalCluster();
> val config = new Config();
> config.setMaxSpoutPending(100);
> config.setMaxSpoutPending(25);
> cluster.submitTopology("test", config, tridentBuilder.build());
>
>
> What is the schema needed to run this example (it also uses CassandraState)?
>
> thanks
> A
|