gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject svn commit: r1341239 - in /gora/trunk: ./ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-tutorial/ gora-tutorial/conf/
Date Mon, 21 May 2012 22:41:21 GMT
Author: lewismc
Date: Mon May 21 22:41:20 2012
New Revision: 1341239

URL: http://svn.apache.org/viewvc?rev=1341239&view=rev
Log:
commit to address GORA-131 & 132 respectively and update to CHANGES.txt

Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/trunk/gora-tutorial/conf/gora.properties
    gora/trunk/gora-tutorial/pom.xml

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Mon May 21 22:41:20 2012
@@ -6,6 +6,10 @@ Gora Change Log
 
 0.3 (trunk) Current Development:
 
+* GORA-131 gora-cassandra should support other key types than String (Kazuomi Kashii via
lewismc)
+
+* GORA-132 Uses ByteBufferSerializer for column value to support various data types rather
than StringSerializer (Kazuomi Kashii via lewismc)
+
 * GORA-77 Replace commons logging with Log4j (Renato Javier Marroquín Mogrovejo via lewismc)
 
 * GORA-134 ListGenericArray's hashCode causes StackOverflowError (Kazuomi Kashii via lewismc)

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
Mon May 21 22:41:20 2012
@@ -36,7 +36,7 @@ public class CassandraResult<K, T extend
   
   private int rowNumber;
 
-  private CassandraResultSet cassandraResultSet;
+  private CassandraResultSet<K> cassandraResultSet;
   
   /**
    * Maps Cassandra columns to Avro fields.
@@ -63,10 +63,10 @@ public class CassandraResult<K, T extend
    */
   @SuppressWarnings("unchecked")
   private void updatePersistent() throws IOException {
-    CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
+    CassandraRow<K> cassandraRow = this.cassandraResultSet.get(this.rowNumber);
     
     // load key
-    this.key = (K) cassandraRow.getKey();
+    this.key = cassandraRow.getKey();
     
     // load value
     Schema schema = this.persistent.getSchema();
@@ -104,7 +104,7 @@ public class CassandraResult<K, T extend
     return (((float) this.rowNumber) / this.cassandraResultSet.size());
   }
 
-  public void setResultSet(CassandraResultSet cassandraResultSet) {
+  public void setResultSet(CassandraResultSet<K> cassandraResultSet) {
     this.cassandraResultSet = cassandraResultSet;
   }
   

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
Mon May 21 22:41:20 2012
@@ -24,7 +24,7 @@ import java.util.HashMap;
 /**
  * List data structure to keep the order coming from the Cassandra selects.
  */
-public class CassandraResultSet extends ArrayList<CassandraRow> {
+public class CassandraResultSet<K> extends ArrayList<CassandraRow<K>> {
 
   /**
    * 
@@ -34,9 +34,9 @@ public class CassandraResultSet extends 
   /**
    * Maps keys to indices in the list.
    */
-  private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
+  private HashMap<K, Integer> indexMap = new HashMap<K, Integer>();
 
-  public CassandraRow getRow(String key) {
+  public CassandraRow<K> getRow(K key) {
     Integer integer = this.indexMap.get(key);
     if (integer == null) {
       return null;
@@ -45,7 +45,7 @@ public class CassandraResultSet extends 
     return this.get(integer);
   }
 
-  public void putRow(String key, CassandraRow cassandraRow) {
+  public void putRow(K key, CassandraRow<K> cassandraRow) {
     this.add(cassandraRow);
     this.indexMap.put(key, this.size()-1);
   } 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
Mon May 21 22:41:20 2012
@@ -23,19 +23,19 @@ import java.util.ArrayList;
 /**
  * List of key value pairs representing a row, tagged by a key.
  */
-public class CassandraRow extends ArrayList<CassandraColumn> {
+public class CassandraRow<K> extends ArrayList<CassandraColumn> {
 
   /**
    * 
    */
   private static final long serialVersionUID = -7620939600192859652L;
-  private String key;
+  private K key;
 
-  public String getKey() {
+  public K getKey() {
     return this.key;
   }
 
-  public void setKey(String key) {
+  public void setKey(K key) {
     this.key = key;
   }
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
Mon May 21 22:41:20 2012
@@ -24,6 +24,11 @@ import java.nio.charset.CharacterCodingE
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
 
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 
 import org.apache.avro.Schema;
@@ -46,7 +51,7 @@ public class CassandraSubColumn extends 
   /**
    * Key-value pair containing the raw data.
    */
-  private HColumn<String, String> hColumn;
+  private HColumn<String, ByteBuffer> hColumn;
 
   public String getName() {
     return hColumn.getName();
@@ -60,28 +65,31 @@ public class CassandraSubColumn extends 
     Field field = getField();
     Schema fieldSchema = field.schema();
     Type type = fieldSchema.getType();
-    String valueString = hColumn.getValue();
+    ByteBuffer valueByteBuffer = hColumn.getValue();
     Object value = null;
     
     switch (type) {
       case STRING:
-        value = new Utf8(valueString);
+        value = new Utf8(StringSerializer.get().fromByteBuffer(valueByteBuffer));
         break;
       case BYTES:
-        // convert string to bytebuffer
-        value = getByteBuffer(valueString);
+        value = valueByteBuffer;
         break;
       case INT:
-        value = Integer.parseInt(valueString);
+        value = IntegerSerializer.get().fromByteBuffer(valueByteBuffer);
         break;
       case LONG:
-        value = Long.parseLong(valueString);
+        value = LongSerializer.get().fromByteBuffer(valueByteBuffer);
         break;
       case FLOAT:
-        value = Float.parseFloat(valueString);
+        value = FloatSerializer.get().fromByteBuffer(valueByteBuffer);
+        break;
+      case DOUBLE:
+        value = DoubleSerializer.get().fromByteBuffer(valueByteBuffer);
         break;
       case ARRAY:
         // convert string to array
+        String valueString = StringSerializer.get().fromByteBuffer(valueByteBuffer);
         valueString = valueString.substring(1, valueString.length()-1);
         String[] elements = valueString.split(", ");
         
@@ -106,17 +114,7 @@ public class CassandraSubColumn extends 
 
   }
 
-  public void setValue(HColumn<String, String> hColumn) {
+  public void setValue(HColumn<String, ByteBuffer> hColumn) {
     this.hColumn = hColumn;
   }
-  
-  public static ByteBuffer getByteBuffer(String valueString) {
-    ByteBuffer byteBuffer = null;
-    try {
-      byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
-    } catch (CharacterCodingException cce) {
-      LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
-    }
-    return byteBuffer;
-  }
 }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
Mon May 21 22:41:20 2012
@@ -18,8 +18,14 @@
 
 package org.apache.gora.cassandra.query;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.HSuperColumn;
 
@@ -35,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public class CassandraSuperColumn extends CassandraColumn {
   public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
 
-  private HSuperColumn<String, String, String> hSuperColumn;
+  private HSuperColumn<String, String, ByteBuffer> hSuperColumn;
   
   public String getName() {
     return hSuperColumn.getName();
@@ -53,15 +59,27 @@ public class CassandraSuperColumn extend
         Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
         Type valueType = fieldSchema.getValueType().getType();
         
-        for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
-          String memberString = hColumn.getValue();
+        for (HColumn<String, ByteBuffer> hColumn : this.hSuperColumn.getColumns())
{
+          ByteBuffer memberByteBuffer = hColumn.getValue();
           Object memberValue = null;
           switch (valueType) {
             case STRING:
-              memberValue = new Utf8(memberString);
+              memberValue = new Utf8(StringSerializer.get().fromByteBuffer(memberByteBuffer));
               break;
             case BYTES:
-              memberValue = CassandraSubColumn.getByteBuffer(memberString);
+              memberValue = memberByteBuffer;
+              break;
+            case INT:
+              memberValue = IntegerSerializer.get().fromByteBuffer(memberByteBuffer);
+              break;
+            case LONG:
+              memberValue = LongSerializer.get().fromByteBuffer(memberByteBuffer);
+              break;
+            case FLOAT:
+              memberValue = FloatSerializer.get().fromByteBuffer(memberByteBuffer);
+              break;
+            case DOUBLE:
+              memberValue = DoubleSerializer.get().fromByteBuffer(memberByteBuffer);
               break;
             default:
               LOG.info("Type for the map value is not supported: " + valueType);
@@ -97,7 +115,7 @@ public class CassandraSuperColumn extend
         if (value instanceof PersistentBase) {
           PersistentBase record = (PersistentBase) value;
 
-          for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
+          for (HColumn<String, ByteBuffer> hColumn : this.hSuperColumn.getColumns())
{
             Field memberField = fieldSchema.getField(hColumn.getName());
             CassandraSubColumn cassandraColumn = new CassandraSubColumn();
             cassandraColumn.setField(memberField);
@@ -113,7 +131,7 @@ public class CassandraSuperColumn extend
     return value;
   }
   
-  public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
+  public void setValue(HSuperColumn<String, String, ByteBuffer> hSuperColumn) {
     this.hSuperColumn = hSuperColumn;
   }
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
Mon May 21 22:41:20 2012
@@ -25,7 +25,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
 import me.prettyprint.cassandra.service.CassandraHostConfigurator;
 import me.prettyprint.hector.api.Cluster;
 import me.prettyprint.hector.api.Keyspace;
@@ -42,7 +46,9 @@ import me.prettyprint.hector.api.query.R
 import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
 import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
 import me.prettyprint.hector.api.HConsistencyLevel;
+import me.prettyprint.hector.api.Serializer;
 
+import org.apache.avro.util.Utf8;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.mapreduce.GoraRecordReader;
 import org.apache.gora.persistency.Persistent;
@@ -56,13 +62,17 @@ public class CassandraClient<K, T extend
   
   private Cluster cluster;
   private Keyspace keyspace;
-  private Mutator<String> mutator;
+  private Mutator<K> mutator;
+  private Class<K> keyClass;
   
   private CassandraMapping cassandraMapping = new CassandraMapping();
 
   private StringSerializer stringSerializer = new StringSerializer();
+  private ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer();
+  private Serializer<K> keySerializer;
   
-  public void initialize() throws Exception {
+  public void initialize(Class<K> keyClass) throws Exception {
+    this.keyClass = keyClass;
     this.cassandraMapping.loadConfiguration();
     this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new
CassandraHostConfigurator(this.cassandraMapping.getHostName()));
     
@@ -72,7 +82,8 @@ public class CassandraClient<K, T extend
     // Just create a Keyspace object on the client side, corresponding to an already existing
keyspace with already created column families.
     this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
     
-    this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
+    this.keySerializer = SerializerTypeInferer.getSerializer(keyClass);
+    this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
   }
   
   /**
@@ -125,28 +136,36 @@ public class CassandraClient<K, T extend
    * @param fieldName the field name
    * @param value the field value.
    */
-  public void addColumn(String key, String fieldName, Object value) {
+  public void addColumn(K key, String fieldName, Object value) {
     if (value == null) {
       return;
     }
+
+    ByteBuffer byteBuffer = null;
     if (value instanceof ByteBuffer) {
-      value = toString((ByteBuffer) value);
+      byteBuffer = (ByteBuffer) value;
+    }
+    else if (value instanceof Utf8) {
+      byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
+    }
+    else if (value instanceof Float) {
+      // workaround for hector-core-1.0-1.jar
+      // because SerializerTypeInferer.getSerializer(Float ) returns ObjectSerializer !?
+      byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
+    }
+    else if (value instanceof Double) {
+      // workaround for hector-core-1.0-1.jar
+      // because SerializerTypeInferer.getSerializer(Double ) returns ObjectSerializer !?
+      byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
+    }
+    else {
+      byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
     }
     
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String columnName = this.cassandraMapping.getColumn(fieldName);
     
-    this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
-  }
-
-  /**
-   * TODO do no convert bytes to string to store a binary field
-   * @param value
-   * @return
-   */
-  private static String toString(ByteBuffer value) {
-    ByteBuffer byteBuffer = (ByteBuffer) value;
-    return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
+    this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName, byteBuffer,
stringSerializer, byteBufferSerializer));
   }
 
   /**
@@ -157,19 +176,36 @@ public class CassandraClient<K, T extend
    * @param value the member value
    */
   @SuppressWarnings("unchecked")
-public void addSubColumn(String key, String fieldName, String memberName, Object value) {
+public void addSubColumn(K key, String fieldName, String memberName, Object value) {
     if (value == null) {
       return;
     }
     
+    ByteBuffer byteBuffer = null;
     if (value instanceof ByteBuffer) {
-      value = toString((ByteBuffer) value);
+      byteBuffer = (ByteBuffer) value;
+    }
+    else if (value instanceof Utf8) {
+      byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
+    }
+    else if (value instanceof Float) {
+      // workaround for hector-core-1.0-1.jar
+      // because SerializerTypeInferer.getSerializer(Float ) returns ObjectSerializer !?
+      byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
+    }
+    else if (value instanceof Double) {
+      // workaround for hector-core-1.0-1.jar
+      // because SerializerTypeInferer.getSerializer(Double ) returns ObjectSerializer !?
+      byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
+    }
+    else {
+      byteBuffer = SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
     }
     
     String columnFamily = this.cassandraMapping.getFamily(fieldName);
     String superColumnName = this.cassandraMapping.getColumn(fieldName);
     
-    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName,
value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
+    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createColumn(memberName,
byteBuffer, stringSerializer, byteBufferSerializer)), this.stringSerializer, this.stringSerializer,
this.byteBufferSerializer));
     
   }
   
@@ -179,23 +215,18 @@ public void addSubColumn(String key, Str
    * @param family the family name to be queried
    * @return a list of family rows
    */
-  public List<Row<String, String, String>> execute(CassandraQuery<K, T>
cassandraQuery, String family) {
+  public List<Row<K, String, ByteBuffer>> execute(CassandraQuery<K, T>
cassandraQuery, String family) {
     
     String[] columnNames = cassandraQuery.getColumns(family);
     Query<K, T> query = cassandraQuery.getQuery();
     int limit = (int) query.getLimit();
-    String startKey = (String) query.getStartKey();
-    String endKey = (String) query.getEndKey();
-    
-    if (startKey == null) {
-      startKey = "";
-    }
-    if (endKey == null) {
-      endKey = "";
+    if (limit < 1) {
+      limit = Integer.MAX_VALUE;
     }
+    K startKey = query.getStartKey();
+    K endKey = query.getEndKey();
     
-    
-    RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace,
this.stringSerializer, stringSerializer, stringSerializer);
+    RangeSlicesQuery<K, String, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace,
this.keySerializer, stringSerializer, byteBufferSerializer);
     rangeSlicesQuery.setColumnFamily(family);
     rangeSlicesQuery.setKeys(startKey, endKey);
     rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -203,8 +234,8 @@ public void addSubColumn(String key, Str
     rangeSlicesQuery.setColumnNames(columnNames);
     
 
-    QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
-    OrderedRows<String, String, String> orderedRows = queryResult.get();
+    QueryResult<OrderedRows<K, String, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
+    OrderedRows<K, String, ByteBuffer> orderedRows = queryResult.get();
     
     
     return orderedRows.getList();
@@ -259,22 +290,17 @@ public void addSubColumn(String key, Str
     return this.cassandraMapping.isSuper(family);
   }
 
-  public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K,
T> cassandraQuery, String family) {
+  public List<SuperRow<K, String, String, ByteBuffer>> executeSuper(CassandraQuery<K,
T> cassandraQuery, String family) {
     String[] columnNames = cassandraQuery.getColumns(family);
     Query<K, T> query = cassandraQuery.getQuery();
     int limit = (int) query.getLimit();
-    String startKey = (String) query.getStartKey();
-    String endKey = (String) query.getEndKey();
-    
-    if (startKey == null) {
-      startKey = "";
+    if (limit < 1) {
+      limit = Integer.MAX_VALUE;
     }
-    if (endKey == null) {
-      endKey = "";
-    }
-    
+    K startKey = query.getStartKey();
+    K endKey = query.getEndKey();
     
-    RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace,
this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
+    RangeSuperSlicesQuery<K, String, String, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace,
this.keySerializer, this.stringSerializer, this.stringSerializer, this.byteBufferSerializer);
     rangeSuperSlicesQuery.setColumnFamily(family);    
     rangeSuperSlicesQuery.setKeys(startKey, endKey);
     rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -282,8 +308,8 @@ public void addSubColumn(String key, Str
     rangeSuperSlicesQuery.setColumnNames(columnNames);
     
     
-    QueryResult<OrderedSuperRows<String, String, String, String>> queryResult
= rangeSuperSlicesQuery.execute();
-    OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
+    QueryResult<OrderedSuperRows<K, String, String, ByteBuffer>> queryResult
= rangeSuperSlicesQuery.execute();
+    OrderedSuperRows<K, String, String, ByteBuffer> orderedRows = queryResult.get();
     return orderedRows.getList();
 
 

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
(original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Mon May 21 22:41:20 2012
@@ -19,10 +19,12 @@
 package org.apache.gora.cassandra.store;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -69,7 +71,17 @@ public class CassandraStore<K, T extends
   private Map<K, T> buffer = new LinkedHashMap<K, T>();
   
   public CassandraStore() throws Exception {
-    this.cassandraClient.initialize();
+    // this.cassandraClient.initialize();
+  }
+
+  public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties)
throws IOException {
+    super.initialize(keyClass, persistent, properties);
+    try {
+      this.cassandraClient.initialize(keyClass);
+    }
+    catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
   }
 
   @Override
@@ -139,22 +151,22 @@ public class CassandraStore<K, T extends
   private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
       CassandraResultSet cassandraResultSet) {
     // select family columns that are included in the query
-    List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery,
family);
+    List<Row<K, String, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery,
family);
     
-    for (Row<String, String, String> row : rows) {
-      String key = row.getKey();
+    for (Row<K, String, ByteBuffer> row : rows) {
+      K key = row.getKey();
       
       // find associated row in the resultset
-      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+      CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
-        cassandraRow = new CassandraRow();
+        cassandraRow = new CassandraRow<K>();
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
       
-      ColumnSlice<String, String> columnSlice = row.getColumnSlice();
+      ColumnSlice<String, ByteBuffer> columnSlice = row.getColumnSlice();
       
-      for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
+      for (HColumn<String, ByteBuffer> hColumn : columnSlice.getColumns()) {
         CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
         cassandraSubColumn.setValue(hColumn);
         cassandraSubColumn.setFamily(family);
@@ -167,18 +179,18 @@ public class CassandraStore<K, T extends
   private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,

       CassandraResultSet cassandraResultSet) {
     
-    List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery,
family);
-    for (SuperRow<String, String, String, String> superRow: superRows) {
-      String key = superRow.getKey();
-      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+    List<SuperRow<K, String, String, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery,
family);
+    for (SuperRow<K, String, String, ByteBuffer> superRow: superRows) {
+      K key = superRow.getKey();
+      CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
         cassandraRow = new CassandraRow();
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
       
-      SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
-      for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns())
{
+      SuperSlice<String, String, ByteBuffer> superSlice = superRow.getSuperSlice();
+      for (HSuperColumn<String, String, ByteBuffer> hSuperColumn: superSlice.getSuperColumns())
{
         CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
         cassandraSuperColumn.setValue(hSuperColumn);
         cassandraSuperColumn.setFamily(family);
@@ -209,7 +221,7 @@ public class CassandraStore<K, T extends
       Schema schema = value.getSchema();
       for (Field field: schema.getFields()) {
         if (value.isDirty(field.pos())) {
-          addOrUpdateField((String) key, field, value.get(field.pos()));
+          addOrUpdateField(key, field, value.get(field.pos()));
         }
       }
     }
@@ -222,7 +234,6 @@ public class CassandraStore<K, T extends
 
   @Override
   public T get(K key, String[] fields) throws IOException {
-    LOG.info("get " + key);
     CassandraQuery<K,T> query = new CassandraQuery<K,T>();
     query.setDataStore(this);
     query.setKeyRange(key, key);
@@ -244,13 +255,14 @@ public class CassandraStore<K, T extends
 
   @Override
   public String getSchemaName() {
-    LOG.info("get schema name");
     return null;
   }
 
   @Override
   public Query<K, T> newQuery() {
-    return new CassandraQuery<K, T>(this);
+    Query<K,T> query = new CassandraQuery<K, T>(this);
+    query.setFields(getFieldsToQuery(null));
+    return query;
   }
 
   /**
@@ -298,24 +310,16 @@ public class CassandraStore<K, T extends
    * @param field   the Avro field representing a datum
    * @param value   the field value
    */
-  private void addOrUpdateField(String key, Field field, Object value) {
+  private void addOrUpdateField(K key, Field field, Object value) {
     Schema schema = field.schema();
     Type type = schema.getType();
-    //LOG.info(field.name() + " " + type.name());
     switch (type) {
       case STRING:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
       case INT:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
       case LONG:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
       case BYTES:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
       case FLOAT:
+      case DOUBLE:
         this.cassandraClient.addColumn(key, field.name(), value);
         break;
       case RECORD:
@@ -333,6 +337,9 @@ public class CassandraStore<K, T extends
                 }
               }
               
+              if (memberValue instanceof Utf8) {
+                memberValue = memberValue.toString();
+              }
               this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
             }
           } else {
@@ -357,6 +364,9 @@ public class CassandraStore<K, T extends
                 }
               }
               
+              if (keyValue instanceof Utf8) {
+                keyValue = keyValue.toString();
+              }
               this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
             
             }
           } else {

Modified: gora/trunk/gora-tutorial/conf/gora.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-tutorial/conf/gora.properties?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-tutorial/conf/gora.properties (original)
+++ gora/trunk/gora-tutorial/conf/gora.properties Mon May 21 22:41:20 2012
@@ -17,10 +17,13 @@
 ##gora.datastore.default is the default detastore implementation to use 
 ##if it is not passed to the DataStoreFactory#createDataStore() method.
 gora.datastore.default=org.apache.gora.hbase.store.HBaseStore
+#gora.datastore.default=org.apache.gora.cassandra.store.CassandraStore
 
 ##whether to create schema automatically if not exists.
 gora.datastore.autocreateschema=true
 
+##Cassandra properties for gora-cassandra module using Cassandra
+#gora.cassandrastore.servers=localhost:9160
 
 ##JDBC properties for gora-sql module using HSQL
 gora.sqlstore.jdbc.driver=org.hsqldb.jdbcDriver

Modified: gora/trunk/gora-tutorial/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-tutorial/pom.xml?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-tutorial/pom.xml (original)
+++ gora/trunk/gora-tutorial/pom.xml Mon May 21 22:41:20 2012
@@ -126,6 +126,11 @@
 
         <dependency>
             <groupId>org.apache.gora</groupId>
+            <artifactId>gora-cassandra</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.gora</groupId>
             <artifactId>gora-sql</artifactId>
         </dependency>
 



Mime
View raw message