You want to run the thrift function describe_schema_versions it will tell you which nodes are on which schemas, and wait until there is a single version. From memory this is what the Cassandra Cli does, have a look in the code base in the o.a.c.cli package. Cheers ----------------- Aaron Morton Freelance Cassandra Developer @aaronmorton http://www.thelastpickle.com On 3/08/2011, at 6:04 AM, Tharindu Mathew wrote: > I ran across a problem, when trying to execute the following code through Hector. > > private boolean createCF(String CFName) { > BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition(); > columnFamilyDefinition.setColumnType(ColumnType.STANDARD); > columnFamilyDefinition.setName(CFName); > columnFamilyDefinition.setKeyspaceName(ReceiverConstants.BAM_KEYSPACE); > > synchronized (this) { > boolean cfDefFound = false; > for (ColumnFamilyDefinition cfDef : bamKeyspaceDefinition.getCfDefs()) { > log.info("CF found : " + cfDef.getName()); > if (cfDef.getName().equals(CFName)) { > cfDefFound = true; > break; > } > } > // Column Family not found, so create it > if (!cfDefFound) { > ThriftCfDef cfDef = new ThriftCfDef(columnFamilyDefinition); > cluster.addColumnFamily(cfDef); > } > cfList.add(CFName); > } > return true; > } > > Even though the code block is synchronized, during a load other thread that enter this block still does not see the CF has been added and results in [1] > > A workaround would be going for a thread sleep. But that seems too hacky for me :(. Is there a way to properly fix this? Maybe a blocking method exists that doesn't return until this schema change is propagated through all nodes. > > BTW, I only have one node runnning. > > Any help would be greatly appreciated. > > -- > Regards, > > Tharindu > > > [1] - > > [2011-08-02 22:46:44,892] INFO {me.prettyprint.cassandra.hector.TimingLogger} - start[1312305404883] time[9] tag[META_WRITE.fail_] > me.prettyprint.hector.api.exceptions.HInvalidRequestException: InvalidRequestException(why:CF is already defined in that keyspace.) > at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:42) > at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:68) > at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:62) > at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101) > at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:156) > at me.prettyprint.cassandra.service.ThriftCluster.addColumnFamily(ThriftCluster.java:72) > at org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.createCF(NoSQLDataStore.java:168) > at org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.persistData(NoSQLDataStore.java:124) > at org.wso2.carbon.bam.receiver.persistence.PersistenceManager.persistEvent(PersistenceManager.java:55) > at org.wso2.carbon.bam.receiver.internal.QueueWorker.run(QueueWorker.java:69) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:619) > Caused by: InvalidRequestException(why:CF is already defined in that keyspace.) > at org.apache.cassandra.thrift.Cassandra$system_add_column_family_result.read(Cassandra.java:23375) > at org.apache.cassandra.thrift.Cassandra$Client.recv_system_add_column_family(Cassandra.java:1333) > at org.apache.cassandra.thrift.Cassandra$Client.system_add_column_family(Cassandra.java:1308) > at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:66) >