cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pete Warden (Created) (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CASSANDRA-3371) Cassandra inferred schema and actual data don't match
Date Mon, 17 Oct 2011 10:26:11 GMT
Cassandra inferred schema and actual data don't match
-----------------------------------------------------

                 Key: CASSANDRA-3371
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
             Project: Cassandra
          Issue Type: Bug
          Components: Hadoop
    Affects Versions: 0.8.7
            Reporter: Pete Warden


It's looking like there may be a mismatch between the schema that's being reported by the
latest CassandraStorage.java, and the data that's actually returned. Here's an example:

rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
DESCRIBE rows;
rows: {key: chararray,columns: {(name: chararray,value: bytearray,photo_owner: chararray,value_photo_owner:
bytearray,pid: chararray,value_pid: bytearray,matched_string: chararray,value_matched_string:
bytearray,src_big: chararray,value_src_big: bytearray,time: chararray,value_time: bytearray,vote_type:
chararray,value_vote_type: bytearray,voter: chararray,value_voter: bytearray)}}
DUMP rows;
(691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu
Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})

getSchema() is reporting the columns as an inner bag of tuples, each of which contains 16
values. In fact, getNext() seems to return an inner bag containing 7 tuples, each of which
contains two values. 

It appears that things got out of sync with this change:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083

See more discussion at:
http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html

Here's a patch I ended up creating for my own use, which gives the results I need (though
it doesn't handle super-columns):
DESCRIBE rows;
rows: {cassandra_key: chararray,photo_owner: bytearray,pid: bytearray,place_matched_string:
bytearray,src_big: bytearray,time: bytearray,vote_type: bytearray,voter: bytearray}

Index: contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
===================================================================
--- contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java	(revision 1185044)
+++ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java	(working copy)
@@ -26,7 +26,7 @@
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -122,15 +122,15 @@
             assert key != null && cf != null;
             
             // and wrap it in a tuple
-	        Tuple tuple = TupleFactory.getInstance().newTuple(2);
+	        Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
             ArrayList<Tuple> columns = new ArrayList<Tuple>();
-            tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(),
key.limit()+key.arrayOffset()));
+            int tupleIndex = 0;
+            tuple.set(tupleIndex++, new DataByteArray(key.array(), key.position()+key.arrayOffset(),
key.limit()+key.arrayOffset()));            
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
             {
-                columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
+                tuple.set(tupleIndex++, columnToTuple(entry.getKey(), entry.getValue(), cfDef));
             }
 
-            tuple.set(1, new DefaultDataBag(columns));
             return tuple;
         }
         catch (InterruptedException e)
@@ -139,30 +139,22 @@
         }
     }
 
-    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
+    private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
     {
-        Tuple pair = TupleFactory.getInstance().newTuple(2);
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
-        setTupleValue(pair, 0, marshallers.get(0).compose(name));
         if (col instanceof Column)
         {
             // standard
             if (validators.get(name) == null)
-                setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
+                return marshallers.get(1).compose(col.value());
             else
-                setTupleValue(pair, 1, validators.get(name).compose(col.value()));
-            return pair;
+                return validators.get(name).compose(col.value());
         }
 
-        // super
-        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
-        for (IColumn subcol : col.getSubColumns())
-            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
-        
-        pair.set(1, new DefaultDataBag(subcols));
-        return pair;
+        // super not currently handled
+        return null;
     }
 
     private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
@@ -312,62 +304,32 @@
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
 
+        ResourceFieldSchema[] tupleFields = new ResourceFieldSchema[cfDef.column_metadata.size()+1];
+        int tupleIndex = 0;
+        
         // add key
         ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
-        keyFieldSchema.setName("key");
+        keyFieldSchema.setName("cassandra_key");
         keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
+        tupleFields[tupleIndex++] = keyFieldSchema;
 
-        // will become the bag of tuples
-        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
-        bagFieldSchema.setName("columns");
-        bagFieldSchema.setType(DataType.BAG);
-        ResourceSchema bagSchema = new ResourceSchema();
-
-
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
 
-        // default comparator/validator
-        ResourceSchema innerTupleSchema = new ResourceSchema();
-        ResourceFieldSchema tupleField = new ResourceFieldSchema();
-        tupleField.setType(DataType.TUPLE);
-        tupleField.setSchema(innerTupleSchema);
-
-        ResourceFieldSchema colSchema = new ResourceFieldSchema();
-        colSchema.setName("name");
-        colSchema.setType(getPigType(marshallers.get(0)));
-        tupleFields.add(colSchema);
-
-        ResourceFieldSchema valSchema = new ResourceFieldSchema();
-        AbstractType validator = marshallers.get(1);
-        valSchema.setName("value");
-        valSchema.setType(getPigType(validator));
-        tupleFields.add(valSchema);
-
         // defined validators/indexes
         for (ColumnDef cdef : cfDef.column_metadata)
         {
-            colSchema = new ResourceFieldSchema();
-            colSchema.setName(new String(cdef.getName()));
-            colSchema.setType(getPigType(marshallers.get(0)));
-            tupleFields.add(colSchema);
-
-            valSchema = new ResourceFieldSchema();
-            validator = validators.get(cdef.getName());
+            ResourceFieldSchema valSchema = new ResourceFieldSchema();
+            AbstractType validator = validators.get(cdef.getName());
             if (validator == null)
                 validator = marshallers.get(1);
-            valSchema.setName("value");
+            valSchema.setName(new String(cdef.getName()));
             valSchema.setType(getPigType(validator));
-            tupleFields.add(valSchema);
+            tupleFields[tupleIndex++] = valSchema;
         }
-        innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()]));
 
-        // a bag can contain only one tuple, but that tuple can contain anything
-        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
-        bagFieldSchema.setSchema(bagSchema);
         // top level schema contains everything
-        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema });
+        schema.setFields(tupleFields);
         return schema;
     }
 
@@ -601,7 +563,7 @@
         TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
         try
         {
-            return Hex.bytesToHex(serializer.serialize(cfDef));
+            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
         }
         catch (TException e)
         {
@@ -616,7 +578,7 @@
         CfDef cfDef = new CfDef();
         try
         {
-            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
         }
         catch (TException e)
         {


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message