gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [1/2] git commit: forward port of Make Cassandra keyspace consistency configurable within gora.properties
Date Fri, 15 Aug 2014 16:51:01 GMT
Repository: gora
Updated Branches:
  refs/heads/master dc76da920 -> 3cacf2a25


forward port of Make Cassandra keyspace consistency configurable within gora.properties


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/68302e21
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/68302e21
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/68302e21

Branch: refs/heads/master
Commit: 68302e213747700ca38cef546f05d0f15a3e99b4
Parents: dc76da9
Author: Lewis John McGibbney <lewis.j.mcgibbney@jpl.nasa.gov>
Authored: Fri Aug 15 09:49:14 2014 -0700
Committer: Lewis John McGibbney <lewis.j.mcgibbney@jpl.nasa.gov>
Committed: Fri Aug 15 09:49:14 2014 -0700

----------------------------------------------------------------------
 .../gora/cassandra/store/CassandraClient.java   | 131 ++++++++++++++-----
 .../gora/cassandra/store/CassandraStore.java    |  25 +++-
 gora-cassandra/src/test/conf/gora.properties    |  15 +--
 .../gora/dynamodb/store/DynamoDBStore.java      |   8 +-
 4 files changed, 133 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index 1d56e32..9e8cd7b 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -18,11 +18,16 @@
 
 package org.apache.gora.cassandra.store;
 
+import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
+import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -57,7 +62,20 @@ import org.apache.gora.query.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * CassandraClient is where all of the primary datastore functionality is 
+ * executed. Typically CassandraClient is invoked by calling 
+ * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}.
+ * CassandraClient deals with Cassandra data model definition, mutation, 
+ * and general/specific mappings.
+ * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}

+ *
+ * @param <K>
+ * @param <T>
+ */
 public class CassandraClient<K, T extends PersistentBase> {
+  
+  /** The logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
   
   private Cluster cluster;
@@ -66,10 +84,29 @@ public class CassandraClient<K, T extends PersistentBase> {
   private Class<K> keyClass;
   private Class<T> persistentClass;
   
+  /** Object which holds the XML mapping for Cassandra. */
   private CassandraMapping cassandraMapping = null;
 
+  /** Hector client default column family consistency level. */
+  public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
+  
+  /** Cassandra serializer to be used for serializing Gora's keys. */
   private Serializer<K> keySerializer;
   
+  /**
+   * Given our key, persistentClass from 
+   * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
+   * we make best efforts to dictate our data model. 
+   * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String)
+   * to see if our keyspace has already been invented, this simple check prevents us from

+   * recreating the keyspace if it already exists. 
+   * We then simple specify (based on the input keyclass) an appropriate serializer
+   * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
+   * defining a mutator from and by which we can mutate this object.
+   * @param keyClass the Key by which we wish o assign a record object
+   * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent}
bean representing the data.
+   * @throws Exception
+   */
   public void initialize(Class<K> keyClass, Class<T> persistentClass) throws
Exception {
     this.keyClass = keyClass;
 
@@ -77,12 +114,14 @@ public class CassandraClient<K, T extends PersistentBase> {
     this.persistentClass = persistentClass;
     this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass);
 
-    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new
CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), 
+        new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
     
     // add keyspace to cluster
     checkKeyspace();
     
-    // Just create a Keyspace object on the client side, corresponding to an already existing
keyspace with already created column families.
+    // Just create a Keyspace object on the client side, corresponding to an already 
+    // existing keyspace with already created column families.
     this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
     
     this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
@@ -99,10 +138,17 @@ public class CassandraClient<K, T extends PersistentBase> {
   
   /**
    * Check if keyspace already exists. If not, create it.
-   * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel}
-   * logic. It is set by passing a ConfigurableConsistencyLevel object right 
-   * when the Keyspace is created. Currently consistency level is .ONE which 
-   * permits consistency to wait until one replica has responded. 
+   * In this method, we also utilize Hector's 
+   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} logic. 
+   * It is set by passing a 
+   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right 
+   * when the {@link me.prettyprint.hector.api.Keyspace} is created. 
+   * If we cannot find a consistency level within <code>gora.properites</code>,

+   * then column family consistency level is set to QUORUM (by default) which permits 
+   * consistency to wait for a quorum of replicas to respond regardless of data center.
+   * QUORUM is Hector Client's default setting and we respect that here as well.
+   * 
+   * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html
    */
   public void checkKeyspace() {
     // "describe keyspace <keyspaceName>;" query
@@ -116,29 +162,29 @@ public class CassandraClient<K, T extends PersistentBase> {
       }
 
       keyspaceDefinition = HFactory.createKeyspaceDefinition(
-    	this.cassandraMapping.getKeyspaceName(), 
+        this.cassandraMapping.getKeyspaceName(), 
         this.cassandraMapping.getKeyspaceReplicationStrategy(),
         this.cassandraMapping.getKeyspaceReplicationFactor(),
         columnFamilyDefinitions
       );
       
       this.cluster.addKeyspace(keyspaceDefinition, true);
-      // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster
'" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName()
+ "'");
       
-      // Create a customized Consistency Level
-      ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
-      Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
-
-      // Define CL.ONE for ColumnFamily "ColumnFamily"
-      clmap.put("ColumnFamily", HConsistencyLevel.ONE);
-
-      // In this we use CL.ONE for read and writes. But you can use different CLs if needed.
-      configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);
-      configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);
-
-      // Then let the keyspace know
-      HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel);
-
+      // GORA-167 Create a customized Consistency Level
+      ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
+      Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions);
+      // Column family consistency levels
+      ccl.setReadCfConsistencyLevels(clmap);
+      ccl.setWriteCfConsistencyLevels(clmap);
+      // Operations consistency levels
+      String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+      ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+      LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'.");
+      opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+      ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
+      LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'.");
+
+      HFactory.createKeyspace("Keyspace", this.cluster, ccl);
       keyspaceDefinition = null;
     }
     else {
@@ -164,6 +210,22 @@ public class CassandraClient<K, T extends PersistentBase> {
   }
 
   /**
+   * Method in charge of setting the consistency level for defined column families.
+   * @param pColFams  Column families
+   * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency
level.
+   */
+  private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition>
pColFams) {
+    Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>();
+    // Get columnFamily consistency level.
+    String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
+    LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'.");
+    // Define consistency for ColumnFamily "ColumnFamily"
+    for (ColumnFamilyDefinition colFamDef : pColFams)
+      clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl));
+    return clMap;
+  }
+  
+  /**
    * Drop keyspace.
    */
   public void dropKeyspace() {
@@ -259,10 +321,10 @@ public class CassandraClient<K, T extends PersistentBase> {
 
   /**
    * Adds an subColumn inside the cassandraMapping file when a String is serialized
-   * @param key
-   * @param fieldName
-   * @param columnName
-   * @param value
+   * @param key the row key
+   * @param fieldName the field name
+   * @param columnName the column name (the member name, or the index of array)
+   * @param value the member value
    */
   public void addSubColumn(K key, String fieldName, String columnName, Object value) {
     addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value);
@@ -270,10 +332,10 @@ public class CassandraClient<K, T extends PersistentBase> {
 
   /**
    * Adds an subColumn inside the cassandraMapping file when an Integer is serialized
-   * @param key
-   * @param fieldName
-   * @param columnName
-   * @param value
+   * @param key the row key
+   * @param fieldName the field name
+   * @param columnName the column name (the member name, or the index of array)
+   * @param value the member value
    */
   public void addSubColumn(K key, String fieldName, Integer columnName, Object value) {
     addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value);
@@ -323,6 +385,7 @@ public class CassandraClient<K, T extends PersistentBase> {
     //TODO Verify this. Everything that goes inside a genericArray will go inside a column
so let's just delete that.
     deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName));
   }
+  
   public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
     if (isSuper( cassandraMapping.getFamily(fieldName) )) {
       int i= 0;
@@ -426,7 +489,8 @@ public class CassandraClient<K, T extends PersistentBase> {
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
     
-    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace,
this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery
+        (this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
     rangeSlicesQuery.setColumnFamily(family);
     rangeSlicesQuery.setKeys(startKey, endKey);
     rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]),
false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -536,7 +600,8 @@ public class CassandraClient<K, T extends PersistentBase> {
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
     
-    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery
= HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(),
ByteBufferSerializer.get(), ByteBufferSerializer.get());
+    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery
= HFactory.createRangeSuperSlicesQuery
+        (this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(),
ByteBufferSerializer.get());
     rangeSuperSlicesQuery.setColumnFamily(family);    
     rangeSuperSlicesQuery.setKeys(startKey, endKey);
     rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -556,6 +621,6 @@ public class CassandraClient<K, T extends PersistentBase> {
    * @return Keyspace
    */
   public String getKeyspaceName() {
-	return this.cassandraMapping.getKeyspaceName();
+    return this.cassandraMapping.getKeyspaceName();
   }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index ffb4af0..496f1f0 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -60,6 +60,7 @@ import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
 import org.slf4j.Logger;
@@ -76,7 +77,21 @@ public class CassandraStore<K, T extends PersistentBase> extends
DataStoreBase<K
   /** Logging implementation */
   public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
 
-  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
+  /** Consistency property level for Cassandra column families */
+  private static final String COL_FAM_CL = "cf.consistency.level";
+   
+  /** Consistency property level for Cassandra read operations. */
+  private static final String READ_OP_CL = "read.consistency.level";
+  
+  /** Consistency property level for Cassandra write operations. */
+  private static final String WRITE_OP_CL = "write.consistency.level";
+  
+  /** Variables to hold different consistency levels defined by the properties. */
+  public static String colFamConsLvl;
+  public static String readOpConsLvl;
+  public static String writeOpConsLvl;
+  
+  private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
 
   /**
    * Fixed string with value "UnionIndex" used to generate an extra column based on 
@@ -126,6 +141,14 @@ public class CassandraStore<K, T extends PersistentBase> extends
DataStoreBase<K
   public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties)
{
     try {
       super.initialize(keyClass, persistent, properties);
+      if (autoCreateSchema) {
+        // If this is not set, then each Cassandra client should set its default
+        // column family
+        colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null);
+        // operations
+        readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null);
+        writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null);
+      }
       this.cassandraClient.initialize(keyClass, persistent);
     } catch (Exception e) {
       LOG.error(e.getMessage());

http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/test/conf/gora.properties
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/test/conf/gora.properties b/gora-cassandra/src/test/conf/gora.properties
index 80427b4..6c8b06a 100644
--- a/gora-cassandra/src/test/conf/gora.properties
+++ b/gora-cassandra/src/test/conf/gora.properties
@@ -14,14 +14,13 @@
 # limitations under the License.
 
 gora.datastore.default=org.apache.gora.cassandra.CassandraStore
-gora.cassandrastore.keyspace=
-gora.cassandrastore.name=
-gora.cassandrastore.class=
-gora.cassandrastore.qualifier=
-gora.cassandrastore.family=
-gora.cassandrastore.type=
-gora.cassandraStore.cluster=Test Cluster
-gora.cassandraStore.host=localhost
+gora.cassandrastore.cluster=Test Cluster
+gora.cassandrastore.host=localhost
+# property is annotated in CassandraClient#checkKeyspace()
+# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL. 
+gora.cassandrastore.cf.consistency.level=ONE
+gora.cassandrastore.read.consistency.level=QUORUM
+gora.cassandrastore.write.consistency.level=ONE
 
 
 

http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
----------------------------------------------------------------------
diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
index 7192c8a..ee48542 100644
--- a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
+++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java
@@ -161,11 +161,11 @@ public class DynamoDBStore<K, T extends Persistent> extends WSDataStoreBase<K,
T
       LOG.debug("Initializing DynamoDB store");
       getCredentials();
       setWsProvider(wsProvider);
-      preferredSchema = properties.getProperty(PREF_SCH_NAME);
-      dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf());
-      dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP));
+      preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null);
+      dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP,
null),(AWSCredentials)getConf());
+      dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP,
null));
       mapping = readMapping();
-      consistency = properties.getProperty(CONSISTENCY_READS);
+      consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null);
       persistentClass = pPersistentClass;
     }
     catch (Exception e) {


Mime
View raw message