cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/3] git commit: Wide row support for pig. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909
Date Mon, 16 Apr 2012 22:42:10 GMT
Wide row support for pig.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6302d6d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6302d6d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6302d6d

Branch: refs/heads/trunk
Commit: a6302d6d7dd35d6b59e9911850f6bd2e69da088b
Parents: ac21a55
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Mon Apr 16 17:41:08 2012 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Mon Apr 16 17:41:08 2012 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |   87 ++++++++++++++-
 1 files changed, 86 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a6302d6d/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index bcc0c79..aaac4ef 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -78,9 +78,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
     public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
     public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
+    public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
 
     private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
     private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
+    private final static boolean DEFAULT_WIDEROW_INPUT = false;
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -100,6 +102,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
     private String inputFormatClass;
     private String outputFormatClass;
     private int limit;
+    private boolean widerows;
+    // wide row hacks
+    private Map<ByteBuffer,IColumn> lastRow;
+    private boolean hasNext = true;
+
 
     public CassandraStorage()
     {
@@ -119,10 +126,85 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
     {
         return limit;
     }
+    
+    public Tuple getNextWide() throws IOException
+    {
+        CfDef cfDef = getCfDef(loadSignature);
+        ByteBuffer key = null;
+        Tuple tuple = TupleFactory.getInstance().newTuple();
+        DefaultDataBag bag = new DefaultDataBag();
+        try
+        {
+            while(true)
+            {
+                hasNext = reader.nextKeyValue();
+                if (!hasNext)
+                {
+                    if (lastRow != null)
+                    {
+                        if (tuple.size() == 0) // lastRow is a new one
+                        {
+                            key = (ByteBuffer)reader.getCurrentKey();
+                            tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(),
key.limit()+key.arrayOffset()));
+                        }
+                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                        {
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                        }
+                        lastRow = null;
+                        tuple.append(bag);
+                        return tuple;
+                    }
+                    else
+                    {
+                        if (tuple.size() == 1) // rare case of just one wide row, key already
set
+                        {
+                            tuple.append(bag);
+                            return tuple;
+                        }
+                        else
+                            return null;
+                    }
+                }
+                if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key))
// key changed
+                {
+                    // read too much, hold on to it for next time
+                    lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                    // but return what we have so far
+                    tuple.append(bag);
+                    return tuple;
+                }
+                if (key == null) // only set the key on the first iteration
+                {
+                    key = (ByteBuffer)reader.getCurrentKey();
+                    tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(),
key.limit()+key.arrayOffset()));
+                }
+                SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                if (lastRow != null) // prepend what was read last time
+                {
+                    for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                    {
+                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                    }
+                    lastRow = null;
+                }
+                for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
+                {
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                }
+            }
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
 
     @Override
     public Tuple getNext() throws IOException
     {
+        if (widerows)
+            return getNextWide();
         try
         {
             // load the next pair
@@ -424,7 +506,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface,
Lo
             SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
             ConfigHelper.setInputSlicePredicate(conf, predicate);
         }
-        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+        widerows = DEFAULT_WIDEROW_INPUT;
+        if (System.getenv(PIG_WIDEROW_INPUT) != null)
+            widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT));
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
         setConnectionInformation();
 
         if (ConfigHelper.getInputRpcPort(conf) == 0)


Mime
View raw message