cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [1/3] git commit: Fix int/bigint in CassandraStorage Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6102
Date Sat, 12 Oct 2013 16:50:52 GMT
Updated Branches:
  refs/heads/cassandra-2.0 e5dba3c62 -> 3e7ebf84f
  refs/heads/trunk b89cce9c8 -> c5b6a33ff


Fix int/bigint in CassandraStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6102


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e7ebf84
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e7ebf84
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e7ebf84

Branch: refs/heads/cassandra-2.0
Commit: 3e7ebf84f975729eb0735269b464b08cfedc7c6b
Parents: e5dba3c
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Sat Oct 12 11:40:10 2013 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Sat Oct 12 11:40:10 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 77 +++++++++++++-------
 .../cassandra/hadoop/pig/CassandraStorage.java  | 48 +++++++-----
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  9 ++-
 3 files changed, 82 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e7ebf84/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index c881734..9e26ddf 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -116,8 +116,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
     }
 
     /** convert a column to a tuple */
-    protected Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws
IOException
+    protected Tuple columnToTuple(Column col, CfInfo cfInfo, AbstractType comparator) throws
IOException
     {
+        CfDef cfDef = cfInfo.cfDef;
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
         // name
@@ -128,13 +129,21 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
 
         // value
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        if (validators.get(col.name()) == null)
+        ByteBuffer colName;
+        if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
+        {
+            ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+            colName = names[names.length-1];
+        }
+        else
+            colName = col.name();
+        if (validators.get(colName) == null)
         {
             Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
             setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR),
col.value()));
         }
         else
-            setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
+            setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
         return pair;
     }
 
@@ -154,11 +163,16 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
     }
 
     /** get the columnfamily definition for the signature */
-    protected CfDef getCfDef(String signature) throws IOException
+    protected CfInfo getCfInfo(String signature) throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        return cfdefFromString(property.getProperty(signature));
+        String prop = property.getProperty(signature);
+        CfInfo cfInfo = new CfInfo();
+        cfInfo.cfDef = cfdefFromString(prop.substring(2));
+        cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
+        cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
+        return cfInfo;
     }
 
     /** construct a map to store the mashaller type to cassandra data type mapping */
@@ -313,12 +327,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
             return DataType.LONG;
         else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType
will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
             return DataType.INTEGER;
-        else if (type instanceof AsciiType || 
-                type instanceof UTF8Type ||
-                type instanceof DecimalType ||
-                type instanceof InetAddressType ||
-                type instanceof LexicalUUIDType ||
-                type instanceof UUIDType )
+        else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof
DecimalType || type instanceof InetAddressType)
             return DataType.CHARARRAY;
         else if (type instanceof FloatType)
             return DataType.FLOAT;
@@ -500,10 +509,14 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
                 }
 
                 // compose the CfDef for the columfamily
-                CfDef cfDef = getCfDef(client);
+                CfInfo cfInfo = getCfInfo(client);
 
-                if (cfDef != null)
-                    properties.setProperty(signature, cfdefToString(cfDef));
+                if (cfInfo.cfDef != null)
+                {
+                    StringBuilder sb = new StringBuilder();
+                    sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1:
0).append(cfdefToString(cfInfo.cfDef));
+                    properties.setProperty(signature, sb.toString());
+                }
                 else
                     throw new IOException(String.format("Column family '%s' not found in
keyspace '%s'",
                                                              column_family,
@@ -549,17 +562,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
         return cfDef;
     }
 
-    /** return the CfDef for the column family */
-    protected CfDef getCfDef(Cassandra.Client client)
+    /** return the CfInfo for the column family */
+    protected CfInfo getCfInfo(Cassandra.Client client)
             throws InvalidRequestException,
                    UnavailableException,
                    TimedOutException,
                    SchemaDisagreementException,
                    TException,
-                   CharacterCodingException,
                    NotFoundException,
                    org.apache.cassandra.exceptions.InvalidRequestException,
-                   ConfigurationException
+                   ConfigurationException,
+                   IOException
     {
         // get CF meta data
         String query = "SELECT type," +
@@ -613,12 +626,18 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
             else
                 cql3Table = true;
         }
-        cfDef.column_metadata = getColumnMetadata(client, cql3Table);
-        return cfDef;
+        cfDef.column_metadata = getColumnMetadata(client);
+        CfInfo cfInfo = new CfInfo();
+        cfInfo.cfDef = cfDef;
+        if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
+            cfInfo.compactCqlTable = true;
+        if (cql3Table)
+            cfInfo.cql3Table = true;; 
+        return cfInfo;
     }
 
     /** get a list of columns */
-    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean
cql3Table)
+    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,
@@ -735,7 +754,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
     /** get a list of columns with defined index*/
     protected List<ColumnDef> getIndexes() throws IOException
     {
-        CfDef cfdef = getCfDef(loadSignature);
+        CfDef cfdef = getCfInfo(loadSignature).cfDef;
         List<ColumnDef> indexes = new ArrayList<ColumnDef>();
         for (ColumnDef cdef : cfdef.column_metadata)
         {
@@ -764,15 +783,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
 
     protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
     {
-        if (validator instanceof DecimalType ||
-                validator instanceof InetAddressType ||
-                validator instanceof LexicalUUIDType ||
-                validator instanceof UUIDType)
-        {
+        if (validator instanceof DecimalType || validator instanceof InetAddressType)
             return validator.getString(value);
-        }
         else
             return validator.compose(value);
     }
+
+    protected class CfInfo
+    {
+        boolean compactCqlTable = false;
+        boolean cql3Table = false;
+        CfDef cfDef;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e7ebf84/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 4083236..5357709 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -103,7 +103,8 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
-        CfDef cfDef = getCfDef(loadSignature);
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
         ByteBuffer key = null;
         Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
@@ -126,7 +127,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                         }
                         for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                         }
                         lastKey = null;
                         lastRow = null;
@@ -164,7 +165,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                             addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
@@ -181,14 +182,14 @@ public class CassandraStorage extends AbstractCassandraStorage
                 {
                     for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                     {
-                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                        bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
                 {
-                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
                 }
             }
         }
@@ -210,7 +211,8 @@ public class CassandraStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef(loadSignature);
+            CfInfo cfInfo = getCfInfo(loadSignature);
+            CfDef cfDef = cfInfo.cfDef;
             ByteBuffer key = reader.getCurrentKey();
             Map<ByteBuffer, Column> cf = reader.getCurrentValue();
             assert key != null && cf != null;
@@ -220,17 +222,26 @@ public class CassandraStorage extends AbstractCassandraStorage
 
             Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
             DefaultDataBag bag = new DefaultDataBag();
-
             // we must add all the indexed columns first to match the schema
             Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>();
             // take care to iterate these in the same order as the schema does
             for (ColumnDef cdef : cfDef.column_metadata)
             {
-                if (cf.containsKey(cdef.name))
+                boolean hasColumn = false;
+                boolean cql3Table = false;
+                try
                 {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
+                    hasColumn = cf.containsKey(cdef.name);
                 }
-                else
+                catch (Exception e)
+                {
+                    cql3Table = true;                  
+                }
+                if (hasColumn)
+                {
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
+                }
+                else if (!cql3Table)
                 {   // otherwise, we need to add an empty tuple to take its place
                     tuple.append(TupleFactory.getInstance().newTuple());
                 }
@@ -240,7 +251,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
             // finally, special top-level indexes if needed
@@ -248,7 +259,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             {
                 for (ColumnDef cdef : getIndexes())
                 {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
                     tuple.append(throwaway.get(1));
                 }
             }
@@ -358,8 +369,8 @@ public class CassandraStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfDef cfDef = getCfDef(loadSignature);
-
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
         if (cfDef.column_type.equals("Super"))
             return null;
         /*
@@ -405,7 +416,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        if (!widerows)
+        if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
         {
             // defined validators/indexes
             for (ColumnDef cdef : cfDef.column_metadata)
@@ -699,12 +710,9 @@ public class CassandraStorage extends AbstractCassandraStorage
     }
 
     /** get a list of column for the column family */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)

+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) 
     throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
-    {
-        if (cql3Table)
-            return new ArrayList<>();
-        
+    {   
         return getColumnMeta(client, true, true);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e7ebf84/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 50ee6b7..d021cbd 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -94,7 +94,8 @@ public class CqlStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef(loadSignature);
+            CfInfo cfInfo = getCfInfo(loadSignature);
+            CfDef cfDef = cfInfo.cfDef;
             Map<String, ByteBuffer> keys = reader.getCurrentKey();
             Map<String, ByteBuffer> columns = reader.getCurrentValue();
             assert keys != null && columns != null;
@@ -280,8 +281,8 @@ public class CqlStorage extends AbstractCassandraStorage
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfDef cfDef = getCfDef(loadSignature);
-
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
 
@@ -430,7 +431,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
     
     /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,


Mime
View raw message