storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guy Wald <...@guywald.com>
Subject Inserting rows to cassandra using Storm Trident
Date Sat, 08 Feb 2014 20:46:27 GMT
Hi,

I'm trying to insert a simple row to a table in  Cassandra 2.0.5. Storm
version: 0.9.0.1.

My test is as follows:
I have a table consisting of an id (int) and sentence (text) colum. id is
the primary key.

My spout generates sentences and I add an ID (static increment in the code).
This is my topology:

    TridentTopology topology = new TridentTopology();
    StateFactory cassandraStateFactory =
CassandraMapState.nonTransactional(options);
    Fields fields = new Fields("id", "sentence");
    MyTridentTupleMapper tupleMapper = new MyTridentTupleMapper(keyspace,
fields);
    CassandraUpdater updater = new CassandraUpdater(tupleMapper);
    TridentState wordCounts = topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new AddId(), new Fields("id"))
        .partitionPersist(cassandraStateFactory, fields, updater);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", config, topology.build());

The code for MyTridentTupleMapper:

https://github.com/guywald/trident-cassandra-read-write-examples/blob/master/src/test/java/com/guywald/storm/trident/cassandra/MyTridentTupleMapper.java

*I get the following exception:*

2014-02-08 22:20:14 ERROR executor:0 -
java.lang.RuntimeException: java.lang.ClassCastException:
storm.trident.state.map.SnapshottableMap cannot be cast to
com.hmsonline.storm.cassandra.trident.CassandraState
 at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
 at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
at
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
 at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)

I'm not sure why it returns this and would appreciate help.

Thanks.

Mime
View raw message