You want to run the thrift function describe_schema_versions it will tell you which nodes are on which schemas, and wait until there is a single version.

From memory this is what the Cassandra Cli does, have a look in the code base in the o.a.c.cli package.

Cheers



-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

On 3/08/2011, at 6:04 AM, Tharindu Mathew <mccloud35@gmail.com> wrote:

I ran across a problem, when trying to execute the following code through Hector.

private boolean createCF(String CFName) {
        BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
        columnFamilyDefinition.setColumnType(ColumnType.STANDARD);
        columnFamilyDefinition.setName(CFName);
        columnFamilyDefinition.setKeyspaceName(ReceiverConstants.BAM_KEYSPACE);

        synchronized (this) {
            boolean cfDefFound = false;
            for (ColumnFamilyDefinition cfDef : bamKeyspaceDefinition.getCfDefs()) {
                log.info("CF found : " + cfDef.getName());
                if (cfDef.getName().equals(CFName)) {
                    cfDefFound = true;
                    break;
                }
            }
            // Column Family not found, so create it
            if (!cfDefFound) {
                ThriftCfDef cfDef = new ThriftCfDef(columnFamilyDefinition);
                cluster.addColumnFamily(cfDef);
            }
            cfList.add(CFName);
        }
        return true;
    }

Even though the code block is synchronized, during a load other thread that enter this block still does not see the CF has been added and results in [1]

A workaround would be going for a thread sleep. But that seems too hacky for me :(. Is there a way to properly fix this? Maybe a blocking method exists that doesn't return until this schema change is propagated through all nodes.

BTW, I only have one node runnning.

Any help would be greatly appreciated.

--
Regards,

Tharindu


[1] -

[2011-08-02 22:46:44,892]  INFO {me.prettyprint.cassandra.hector.TimingLogger} -  start[1312305404883] time[9] tag[META_WRITE.fail_]
me.prettyprint.hector.api.exceptions.HInvalidRequestException: InvalidRequestException(why:CF is already defined in that keyspace.)
at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:42)
at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:68)
at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:62)
at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:156)
at me.prettyprint.cassandra.service.ThriftCluster.addColumnFamily(ThriftCluster.java:72)
at org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.createCF(NoSQLDataStore.java:168)
at org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.persistData(NoSQLDataStore.java:124)
at org.wso2.carbon.bam.receiver.persistence.PersistenceManager.persistEvent(PersistenceManager.java:55)
at org.wso2.carbon.bam.receiver.internal.QueueWorker.run(QueueWorker.java:69)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: InvalidRequestException(why:CF is already defined in that keyspace.)
at org.apache.cassandra.thrift.Cassandra$system_add_column_family_result.read(Cassandra.java:23375)
at org.apache.cassandra.thrift.Cassandra$Client.recv_system_add_column_family(Cassandra.java:1333)
at org.apache.cassandra.thrift.Cassandra$Client.system_add_column_family(Cassandra.java:1308)
at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:66)