cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r1177084 - /cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Date Wed, 28 Sep 2011 22:01:08 GMT
Author: brandonwilliams
Date: Wed Sep 28 22:01:08 2011
New Revision: 1177084

URL: http://svn.apache.org/viewvc?rev=1177084&view=rev
Log:
Fix handling of integer types in pig.
Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-2810

Modified:
    cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1177084&r1=1177083&r2=1177084&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Wed Sep 28 22:01:08 2011
@@ -17,11 +17,13 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
@@ -143,18 +145,14 @@ public class CassandraStorage extends Lo
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
+        setTupleValue(pair, 0, marshallers.get(0).compose(name));
         if (col instanceof Column)
         {
             // standard
-            pair.set(0, marshallers.get(0).compose(name));
             if (validators.get(name) == null)
-                // Have to special case BytesType because compose returns a ByteBuffer
-                if (marshallers.get(1) instanceof BytesType)
-                    pair.set(1, new DataByteArray(ByteBufferUtil.getArray(col.value())));
-                else
-                    pair.set(1, marshallers.get(1).compose(col.value()));
+                setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
             else
-                pair.set(1, validators.get(name).compose(col.value()));
+                setTupleValue(pair, 1, validators.get(name).compose(col.value()));
             return pair;
         }
 
@@ -167,6 +165,16 @@ public class CassandraStorage extends Lo
         return pair;
     }
 
+    private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+    {
+       if (value instanceof BigInteger)
+           pair.set(position, ((BigInteger) value).intValue());
+       else if (value instanceof ByteBuffer)
+           pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+       else
+           pair.set(position, value);
+    }
+
     private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
@@ -453,8 +461,6 @@ public class CassandraStorage extends Lo
         DefaultDataBag pairs = (DefaultDataBag) t.get(1);
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
         CfDef cfDef = getCfDef(storeSignature);
-        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         try
         {
             for (Tuple pair : pairs)
@@ -498,15 +504,8 @@ public class CassandraStorage extends Lo
                    else
                    {
                        org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
-                       column.name = marshallers.get(0).decompose((pair.get(0)));
-                       if (validators.get(column.name) == null)
-                           // Have to special case BytesType to convert DataByteArray into
ByteBuffer
-                           if (marshallers.get(1) instanceof BytesType)
-                               column.value = objToBB(pair.get(1));
-                           else
-                               column.value = marshallers.get(1).decompose(pair.get(1));
-                       else
-                           column.value = validators.get(column.name).decompose(pair.get(1));
+                       column.name = objToBB(pair.get(0));
+                       column.value = objToBB(pair.get(1));
                        column.setTimestamp(System.currentTimeMillis() * 1000);
                        mutation.column_or_supercolumn = new ColumnOrSuperColumn();
                        mutation.column_or_supercolumn.column = column;
@@ -626,3 +625,4 @@ public class CassandraStorage extends Lo
         return cfDef;
     }
 }
+



Mime
View raw message