cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [4/6] git commit: Fix 5488 round 2
Date Sun, 26 May 2013 11:21:10 GMT
Fix 5488 round 2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2dd73d17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2dd73d17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2dd73d17

Branch: refs/heads/trunk
Commit: 2dd73d171068d743befcd445a14751032d232e4e
Parents: 0db9406
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Wed May 22 11:18:59 2013 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Wed May 22 11:19:05 2013 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |   34 ++++++++++-----
 1 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2dd73d17/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index b681ee3..cf1c08f 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -130,7 +130,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
     {
         CfDef cfDef = getCfDef(loadSignature);
         ByteBuffer key = null;
-        Tuple tuple = TupleFactory.getInstance().newTuple();
+        Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
         try
         {
@@ -139,12 +139,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
                 hasNext = reader.nextKeyValue();
                 if (!hasNext)
                 {
+                    if (tuple == null)
+                        tuple = TupleFactory.getInstance().newTuple();
+
                     if (lastRow != null)
                     {
                         if (tuple.size() == 0) // lastRow is a new one
                         {
                             key = (ByteBuffer)reader.getCurrentKey();
-                            tuple = addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class()));
+                            tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
                         }
                         for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                         {
@@ -180,7 +183,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
                     key = (ByteBuffer)reader.getCurrentKey();
                     if (lastKey != null && !(key.equals(lastKey))) // last key only
had one value
                     {
-                        tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+                        if (tuple == null)
+                            tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+                        else
+                            addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -190,7 +196,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
                         lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                         return tuple;
                     }
-                    tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
+                    if (tuple == null)
+                        tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
+                    else
+                        addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                 }
                 SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                 if (lastRow != null) // prepend what was read last time
@@ -233,7 +242,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
             // output tuple, will hold the key, each indexed column in a tuple, then a bag
of the rest
             // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue
on it
 
-            Tuple tuple = addKeyToTuple(null, key, cfDef, parseType(cfDef.getKey_validation_class()));
+            Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
             DefaultDataBag bag = new DefaultDataBag();
 
             // we must add all the indexed columns first to match the schema
@@ -292,12 +301,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
         return t;
     }
 
-    private Tuple addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator)
throws IOException
+    private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws
IOException
+    {
+        Tuple tuple = TupleFactory.getInstance().newTuple(1);
+        addKeyToTuple(tuple, key, cfDef, comparator);
+        return tuple;
+    }
+
+    private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator)
throws IOException
     {
-        if( tuple == null )
-        {
-            tuple = TupleFactory.getInstance().newTuple(1);
-        }
         if( comparator instanceof AbstractCompositeType )
         {
             setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
@@ -306,7 +318,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
         {
             setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
         }
-        return tuple;
+
     }
 
     private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws
IOException


Mime
View raw message