storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dan DeCapria, CivicScience" <dan.decap...@civicscience.com>
Subject Re: Storm Cassandra and the CassandraCounterBatchingBolt
Date Tue, 11 Feb 2014 15:54:33 GMT
Steve, potentially the input into your counter bolt is malformed given your
Cassandra schema? Have you had success writing into Cassandra in the past
with this setup and Storm?

So, I'm very new to Storm, but I've gotten Storm-Cassandra to work in my
setup.  For reference, the package dependency I'm using is Maven
<dependency><groupId>com.hmsonline</groupId><artifactId>storm-cassandra</artifactId><version>0.4.0-rc4</version></dependency>.
 The version of Cassandra 2.0.4 is DataStax dsc20-2.0.4-1
cassandra20-2.0.4-1 with Oracle Java 7u51 x64, Storm 0.9.0.1, and Zk 3.4.5.
Cassandra schema: CREATE KEYSPACE stormks; CREATE COLUMN FAMILY mycf WITH
default_validation_class=CounterColumnType AND
key_validation_class=AsciiType AND comparator=AsciiType;

I have had success writing into Cassandra by creating my own
CassandraWriterBolt class, which ultimately extends the Astyanax client
through the storm-cassandra package's CassandraBolt. There are most likely
billions of ways to do this better, but it works for me thus far, so I've
attached the the class and implementation for reference to this email
thread. Example usage for your topology:

        String cassandra_host = "localhost:9160";
        String cassandra_config_key = "cassandra-config";
        String cassandra_key_space = "stormks";
        String cassandra_cf = "mycf";
        String cassandra_name_key = "key";
        String cassandra_name_inc_amount = "inc_amount";
        CassandraWriterBolt<String,String,String> cassandraWriterBolt =
                new CassandraWriterBolt<>(
                        cassandra_host, cassandra_config_key,
cassandra_key_space,
                        cassandra_cf, cassandra_name_key,
cassandra_name_inc_amount
                );
       /***/
       conf.put(cassandraWriterBolt.getClientConfigKey(),
cassandraWriterBolt.getClientConfig());

For the setup bolt which forms the output tuple to stream to the
CassandraWriterBolt implementation, you just need to keep consistent the
expectations set for the CassandraWriterBolt on instantiation, namely the
cassandra_name_key and the cassandra_name_inc_amount naming conventions.
 In example; I'd send a Tuple ~
("myCasskey",1L,"aCassColumnNameWCounter","bCassColumnNameWCounter",...etc.)
and with declareOutputFields' declarer containing new Fields(new
String[]{"key","inc_amount","column_a","column_b",...etc.}), which should
update Cassandra's stormks::mycf: myCassKey[aCassColumnNameWCounter] += 1L,
myCassKey[bCassColumnNameWCounter] += 1L.

Hope this helps towards fixing your issue,

-Dan

Mime
View raw message