cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r1088800 - in /cassandra/branches/cassandra-0.7: contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ src/java/org/apache/cassandra/db/marshal/ src/java/org/apache/cassandra/utils/
Date Mon, 04 Apr 2011 21:53:09 GMT
Author: brandonwilliams
Date: Mon Apr  4 21:53:09 2011
New Revision: 1088800

URL: http://svn.apache.org/viewvc?rev=1088800&view=rev
Log:
Pig uses schema information to cast to/from native types.
Patch by Jeremy Hanna and brandonwilliams, reviewed by brandonwilliams
for CASSANDRA-2387

Modified:
    cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Mon Apr  4 21:53:09 2011
@@ -20,6 +20,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,9 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.avro.Mutation;
 import org.apache.cassandra.avro.Deletion;
 import org.apache.cassandra.avro.ColumnOrSuperColumn;
@@ -44,6 +46,14 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 
 /**
  * A LoadFunc wrapping ColumnFamilyInputFormat.
@@ -58,6 +68,8 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
+    private static String UDFCONTEXT_SCHEMA_KEY = "schema";
+
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
@@ -72,8 +84,8 @@ public class CassandraStorage extends Lo
     private RecordWriter writer;
     private int limit;
 
-    public CassandraStorage() 
-    { 
+    public CassandraStorage()
+    {
         this(1024);
     }
 
@@ -100,19 +112,20 @@ public class CassandraStorage extends Lo
             if (!reader.nextKeyValue())
                 return null;
 
+            CfDef cfDef = getCfDef();
             ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
             SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
             assert key != null && cf != null;
             
             // and wrap it in a tuple
-	    Tuple tuple = TupleFactory.getInstance().newTuple(2);
+	        Tuple tuple = TupleFactory.getInstance().newTuple(2);
             ArrayList<Tuple> columns = new ArrayList<Tuple>();
             tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(),
key.limit()+key.arrayOffset()));
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
-            {                    
-                columns.add(columnToTuple(entry.getKey(), entry.getValue()));
+            {
+                columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
             }
-         
+
             tuple.set(1, new DefaultDataBag(columns));
             return tuple;
         }
@@ -122,28 +135,85 @@ public class CassandraStorage extends Lo
         }
     }
 
-    private Tuple columnToTuple(ByteBuffer name, IColumn col) throws IOException
+    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
-        pair.set(0, new DataByteArray(name.array(), name.position()+name.arrayOffset(), name.limit()+name.arrayOffset()));
+        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+
         if (col instanceof Column)
         {
             // standard
-            pair.set(1, new DataByteArray(col.value().array(), 
-                                          col.value().position()+col.value().arrayOffset(),
-                                          col.value().limit()+col.value().arrayOffset()));
+            pair.set(0, marshallers.get(0).compose(name));
+            if (validators.get(name) == null)
+                // Have to special case BytesType because compose returns a ByteBuffer
+                if (marshallers.get(1) instanceof BytesType)
+                    pair.set(1, new DataByteArray(ByteBufferUtil.getArray(col.value())));
+                else
+                    pair.set(1, marshallers.get(1).compose(col.value()));
+            else
+                pair.set(1, validators.get(name).compose(col.value()));
             return pair;
         }
 
         // super
         ArrayList<Tuple> subcols = new ArrayList<Tuple>();
         for (IColumn subcol : ((SuperColumn)col).getSubColumns())
-            subcols.add(columnToTuple(subcol.name(), subcol));
+            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
         
         pair.set(1, new DefaultDataBag(subcols));
         return pair;
     }
 
+    private CfDef getCfDef()
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(ResourceSchema.class);
+        return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY));
+    }
+
+    private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+    {
+        ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
+        AbstractType comparator = null;
+        AbstractType default_validator = null;
+        try
+        {
+            comparator = FBUtilities.getInstance(cfDef.comparator_type, "comparator");
+            default_validator = FBUtilities.getInstance(cfDef.default_validation_class, "validator");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+
+        marshallers.add(comparator);
+        marshallers.add(default_validator);
+        return marshallers;
+    }
+
+    private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws  IOException
+    {
+        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+        for (ColumnDef cd : cfDef.column_metadata)
+        {
+            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+            {
+                AbstractType validator = null;
+                try
+                {
+                    validator = FBUtilities.getInstance(cd.getValidation_class(), "validator");
+                    validators.put(cd.name, validator);
+                }
+                catch (ConfigurationException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return validators;
+    }
+
     @Override
     public InputFormat getInputFormat()
     {
@@ -156,7 +226,7 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-    private void setLocationFromUri(String location) throws IOException
+     private void setLocationFromUri(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
         String names[];
@@ -219,6 +289,7 @@ public class CassandraStorage extends Lo
         }
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
+        initSchema();
     }
 
     @Override
@@ -274,7 +345,9 @@ public class CassandraStorage extends Lo
         ByteBuffer key = objToBB(t.get(0));
         DefaultDataBag pairs = (DefaultDataBag) t.get(1);
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
-
+        CfDef cfDef = getCfDef();
+        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         try
         {
             for (Tuple pair : pairs)
@@ -306,7 +379,7 @@ public class CassandraStorage extends Lo
                        mutation.column_or_supercolumn.super_column = sc;
                    }
                }
-               else // assume column since it could be anything else
+               else // assume column since it couldn't be anything else
                {
                    if (pair.get(1) == null)
                    {
@@ -318,8 +391,15 @@ public class CassandraStorage extends Lo
                    else
                    {
                        org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column();
-                       column.name = objToBB(pair.get(0));
-                       column.value = objToBB(pair.get(1));
+                       column.name = marshallers.get(0).decompose((pair.get(0)));
+                       if (validators.get(column.name) == null)
+                           // Have to special case BytesType to convert DataByteArray into
ByteBuffer
+                           if (marshallers.get(1) instanceof BytesType)
+                               column.value = ByteBuffer.wrap(((DataByteArray) pair.get(1)).get());
+                           else
+                               column.value = marshallers.get(1).decompose(pair.get(1));
+                       else
+                           column.value = validators.get(column.name).decompose(pair.get(1));
                        column.timestamp = System.currentTimeMillis() * 1000;
                        mutation.column_or_supercolumn = new ColumnOrSuperColumn();
                        mutation.column_or_supercolumn.column = column;
@@ -358,4 +438,92 @@ public class CassandraStorage extends Lo
         return new RequiredFieldResponse(true);
     }
 
+
+    /* Methods to get the column family schema from Cassandra */
+
+    private void initSchema()
+    {
+        Cassandra.Client client = null;
+        try
+        {
+            client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf),
true);
+            CfDef cfDef = null;
+            client.set_keyspace(keyspace);
+            KsDef ksDef = client.describe_keyspace(keyspace);
+            List<CfDef> defs = ksDef.getCf_defs();
+            for (CfDef def : defs)
+            {
+                if (column_family.equalsIgnoreCase(def.getName()))
+                {
+                    cfDef = def;
+                    break;
+                }
+            }
+            UDFContext context = UDFContext.getUDFContext();
+            Properties property = context.getUDFProperties(ResourceSchema.class);
+            property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (NotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Cassandra.Client createConnection(String host, Integer port, boolean framed)
throws IOException
+    {
+        TSocket socket = new TSocket(host, port);
+        TTransport trans = framed ? new TFramedTransport(socket) : socket;
+        try
+        {
+            trans.open();
+        }
+        catch (TTransportException e)
+        {
+            throw new IOException("unable to connect to server", e);
+        }
+        return new Cassandra.Client(new TBinaryProtocol(trans));
+    }
+
+    private static String cfdefToString(CfDef cfDef)
+    {
+        assert cfDef != null;
+        // this is so awful it's kind of cool!
+        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+        try
+        {
+            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static CfDef cfdefFromString(String st)
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+        CfDef cfDef = new CfDef();
+        try
+        {
+            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return cfDef;
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java
Mon Apr  4 21:53:09 2011
@@ -36,7 +36,7 @@ import static org.apache.cassandra.io.ss
  * should always handle those values even if they normally do not
  * represent a valid ByteBuffer for the type being compared.
  */
-public abstract class AbstractType implements Comparator<ByteBuffer>
+public abstract class AbstractType<T> implements Comparator<ByteBuffer>
 {
     public final Comparator<IndexInfo> indexComparator;
     public final Comparator<IndexInfo> indexReverseComparator;
@@ -108,6 +108,10 @@ public abstract class AbstractType imple
         throw new UnsupportedOperationException();
     }
 
+    public abstract T compose(ByteBuffer bytes);
+
+    public abstract ByteBuffer decompose(T value);
+
     /* validate that the byte array is a valid sequence for the type we are supposed to be
comparing */
     public abstract void validate(ByteBuffer bytes) throws MarshalException;
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java
Mon Apr  4 21:53:09 2011
@@ -28,12 +28,32 @@ import com.google.common.base.Charsets;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class AsciiType extends BytesType
+public class AsciiType extends AbstractType<String>
 {
     public static final AsciiType instance = new AsciiType();
 
+    public static AsciiType getInstance()
+    {
+        return instance;
+    }
+
     AsciiType() {} // singleton
 
+    public String compose(ByteBuffer bytes)
+    {
+        return getString(bytes);
+    }
+
+    public ByteBuffer decompose(String value)
+    {
+        return ByteBufferUtil.bytes(value, Charsets.US_ASCII);
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        return BytesType.bytesCompare(o1, o2);
+    }
+
     @Override
     public String getString(ByteBuffer bytes)
     {
@@ -49,7 +69,7 @@ public class AsciiType extends BytesType
 
     public ByteBuffer fromString(String source)
     {
-        return ByteBufferUtil.bytes(source, Charsets.US_ASCII);
+        return decompose(source);
     }
 
     public void validate(ByteBuffer bytes) throws MarshalException

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java
Mon Apr  4 21:53:09 2011
@@ -25,19 +25,39 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class BytesType extends AbstractType
+public class BytesType extends AbstractType<ByteBuffer>
 {
     public static final BytesType instance = new BytesType();
 
+    public static BytesType getInstance()
+    {
+        return instance;
+    }
+
     BytesType() {} // singleton
-    
+
+    public ByteBuffer compose(ByteBuffer bytes)
+    {
+        return bytes.duplicate();
+    }
+
+    public ByteBuffer decompose(ByteBuffer value)
+    {
+        return value;
+    }
+
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
+        return BytesType.bytesCompare(o1, o2);
+    }
+
+    public static int bytesCompare(ByteBuffer o1, ByteBuffer o2)
+    {
         if(null == o1){
             if(null == o2) return 0;
             else return -1;
         }
-              
+
         return ByteBufferUtil.compareUnsigned(o1, o2);
     }
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java
Mon Apr  4 21:53:09 2011
@@ -22,12 +22,19 @@ package org.apache.cassandra.db.marshal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.thrift.TBaseHelper;
 
-public final class IntegerType extends AbstractType
+public final class IntegerType extends AbstractType<BigInteger>
 {
     public static final IntegerType instance = new IntegerType();
 
+    public static IntegerType getInstance()
+    {
+        return instance;
+    }
+
+
     private static int findMostSignificantByte(ByteBuffer bytes)
     {
         int len = bytes.remaining() - 1;
@@ -56,6 +63,16 @@ public final class IntegerType extends A
 
     IntegerType() {/* singleton */}
 
+    public BigInteger compose(ByteBuffer bytes)
+    {
+        return new BigInteger(ByteBufferUtil.getArray(bytes));
+    }
+
+    public ByteBuffer decompose(BigInteger value)
+    {
+        return ByteBuffer.wrap(value.toByteArray());
+    }
+
     public int compare(ByteBuffer lhs, ByteBuffer rhs)
     {
         int lhsLen = lhs.remaining();
@@ -138,7 +155,7 @@ public final class IntegerType extends A
             throw new RuntimeException("'" + source + "' could not be translated into an
IntegerType.");
         }
 
-        return ByteBuffer.wrap(integerType.toByteArray());
+        return decompose(integerType);
     }
 
     public void validate(ByteBuffer bytes) throws MarshalException

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
Mon Apr  4 21:53:09 2011
@@ -26,12 +26,27 @@ import java.util.UUID;
 
 import org.apache.cassandra.utils.UUIDGen;
 
-public class LexicalUUIDType extends AbstractType
+public class LexicalUUIDType extends AbstractType<UUID>
 {
     public static final LexicalUUIDType instance = new LexicalUUIDType();
 
+    public static LexicalUUIDType getInstance()
+    {
+        return instance;
+    }
+
     LexicalUUIDType() {} // singleton
 
+    public UUID compose(ByteBuffer bytes)
+    {
+        return UUIDGen.getUUID(bytes);
+    }
+
+    public ByteBuffer decompose(UUID value)
+    {
+        return ByteBuffer.wrap(UUIDGen.decompose(value));
+    }
+
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
         if (o1.remaining() == 0)
@@ -61,7 +76,7 @@ public class LexicalUUIDType extends Abs
 
     public ByteBuffer fromString(String source)
     {
-        return ByteBuffer.wrap(UUIDGen.decompose(UUID.fromString(source)));
+        return decompose(UUID.fromString(source));
     }
 
     public void validate(ByteBuffer bytes) throws MarshalException

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
Mon Apr  4 21:53:09 2011
@@ -29,7 +29,7 @@ import org.apache.cassandra.utils.ByteBu
 
 /** for sorting columns representing row keys in the row ordering as determined by a partitioner.
  * Not intended for user-defined CFs, and will in fact error out if used with such. */
-public class LocalByPartionerType<T extends Token> extends AbstractType
+public class LocalByPartionerType<T extends Token> extends AbstractType<ByteBuffer>
 {
     private final IPartitioner<T> partitioner;
 
@@ -38,6 +38,16 @@ public class LocalByPartionerType<T exte
         this.partitioner = partitioner;
     }
 
+    public ByteBuffer compose(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+
+    public ByteBuffer decompose(ByteBuffer bytes)
+    {
+        throw new UnsupportedOperationException("You can't do this with a local partitioner.");
+    }
+
     public String getString(ByteBuffer bytes)
     {
         return ByteBufferUtil.bytesToHex(bytes);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java
Mon Apr  4 21:53:09 2011
@@ -25,12 +25,27 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class LongType extends AbstractType
+public class LongType extends AbstractType<Long>
 {
     public static final LongType instance = new LongType();
 
+    public static LongType getInstance()
+    {
+        return instance;
+    }
+
     LongType() {} // singleton
 
+    public Long compose(ByteBuffer bytes)
+    {
+        return ByteBufferUtil.toLong(bytes);
+    }
+
+    public ByteBuffer decompose(Long value)
+    {
+        return ByteBufferUtil.bytes(value);
+    }
+
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
         if (o1.remaining() == 0)
@@ -78,7 +93,7 @@ public class LongType extends AbstractTy
             throw new RuntimeException("'" + source + "' could not be translated into a LongType.");
         }
 
-        return ByteBufferUtil.bytes(longType);
+        return decompose(longType);
     }
 
     public void validate(ByteBuffer bytes) throws MarshalException

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
Mon Apr  4 21:53:09 2011
@@ -26,12 +26,27 @@ import java.util.UUID;
 
 import org.apache.cassandra.utils.UUIDGen;
 
-public class TimeUUIDType extends AbstractType
+public class TimeUUIDType extends AbstractType<UUID>
 {
     public static final TimeUUIDType instance = new TimeUUIDType();
 
+    public static TimeUUIDType getInstance()
+    {
+        return instance;
+    }
+
     TimeUUIDType() {} // singleton
 
+    public UUID compose(ByteBuffer bytes)
+    {
+        return UUIDGen.getUUID(bytes);
+    }
+
+    public ByteBuffer decompose(UUID value)
+    {
+        return ByteBuffer.wrap(UUIDGen.decompose(value));
+    }
+
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
         if (o1.remaining() == 0)
@@ -102,7 +117,7 @@ public class TimeUUIDType extends Abstra
         if (uuid.version() != 1)
             throw new IllegalArgumentException("TimeUUID supports only version 1 UUIDs");
 
-        return ByteBuffer.wrap(UUIDGen.decompose(uuid));
+        return decompose(uuid);
     }
 
     public void validate(ByteBuffer bytes) throws MarshalException

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
Mon Apr  4 21:53:09 2011
@@ -23,14 +23,35 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 
+import com.google.common.base.Charsets;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class UTF8Type extends BytesType
+public class UTF8Type extends AbstractType<String>
 {
     public static final UTF8Type instance = new UTF8Type();
 
+    public static UTF8Type getInstance()
+    {
+        return instance;
+    }
+
     UTF8Type() {} // singleton
 
+    public String compose(ByteBuffer bytes)
+    {
+        return getString(bytes);
+    }
+
+    public ByteBuffer decompose(String value)
+    {
+        return ByteBufferUtil.bytes(value, Charsets.UTF_8);
+    }
+
+    public int compare(ByteBuffer o1, ByteBuffer o2)
+    {
+        return BytesType.bytesCompare(o1, o2);
+    }
+
     public String getString(ByteBuffer bytes)
     {
         try
@@ -45,9 +66,9 @@ public class UTF8Type extends BytesType
 
     public ByteBuffer fromString(String source)
     {
-        return ByteBufferUtil.bytes(source);
+        return decompose(source);
     }
-    
+
     public void validate(ByteBuffer bytes) throws MarshalException
     {
         if (!UTF8Validator.validate(bytes.slice()))

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
Mon Apr  4 21:53:09 2011
@@ -377,6 +377,21 @@ public class ByteBufferUtil
         return bytes.getInt(bytes.position());
     }
 
+    public static long toLong(ByteBuffer bytes)
+    {
+        return bytes.getLong(bytes.position());
+    }
+
+    public static float toFloat(ByteBuffer bytes)
+    {
+        return bytes.getFloat(bytes.position());
+    }
+
+    public static double toDouble(ByteBuffer bytes)
+    {
+        return bytes.getDouble(bytes.position());
+    }
+
     public static ByteBuffer bytes(int i)
     {
         return ByteBuffer.allocate(4).putInt(0, i);
@@ -387,6 +402,17 @@ public class ByteBufferUtil
         return ByteBuffer.allocate(8).putLong(0, n);
     }
 
+    public static ByteBuffer bytes(float f)
+    {
+        return ByteBuffer.allocate(4).putFloat(0, f);
+    }
+
+    public static ByteBuffer bytes(double d)
+    {
+        return ByteBuffer.allocate(8).putDouble(0, d);
+    }
+
+
     public static InputStream inputStream(ByteBuffer bytes)
     {
         final ByteBuffer copy = bytes.duplicate();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1088800&r1=1088799&r2=1088800&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
Mon Apr  4 21:53:09 2011
@@ -543,6 +543,31 @@ public class FBUtilities
         }
     }
 
+    public static <T> T getInstance(String classname, String readable) throws ConfigurationException
+    {
+        Class cls = classForName(classname,  readable);
+        T rval = null;
+        try
+        {
+            rval = (T) cls.getDeclaredMethod("getInstance").invoke(null, (Object) null);
+
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new ConfigurationException("Class does not have the getInstance method
with no arguments");
+        }
+        catch (InvocationTargetException e)
+        {
+            throw new ConfigurationException(String.format("Could not call method getInstance
on %s class %s", readable, classname));
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new ConfigurationException(String.format("Could not call method getInstance
on %s class %s", readable, classname));
+        }
+
+        return rval;
+    }
+
     public static <T extends Comparable> SortedSet<T> singleton(T column)
     {
         return new TreeSet<T>(Arrays.asList(column));



Mime
View raw message