cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [2/3] cassandra git commit: Remove use of Cell in Thrift M/R classes
Date Thu, 04 Jun 2015 10:06:55 GMT
Remove use of Cell in Thrift M/R classes

Patch and review by Philip Thompson and Sam Tunnicliffe for
CASSANDRA-8609


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/19366575
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/19366575
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/19366575

Branch: refs/heads/trunk
Commit: 1936657570655129052cc48fa373c155086a6456
Parents: 16b0288
Author: Sam Tunnicliffe <sam@beobal.com>
Authored: Tue Jun 2 15:56:36 2015 +0100
Committer: Sam Tunnicliffe <sam@beobal.com>
Committed: Thu Jun 4 10:46:57 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 examples/hadoop_cql3_word_count/README.txt      |  14 +-
 .../bin/word_count_counters                     |   1 +
 .../conf/log4j.properties                       |  32 -
 .../hadoop_cql3_word_count/conf/logback.xml     |  42 +
 .../hadoop_cql3_word_count/src/WordCount.java   |   4 +-
 examples/hadoop_word_count/README.txt           |  10 +-
 .../hadoop_word_count/bin/word_count_counters   |   1 +
 .../hadoop_word_count/conf/log4j.properties     |  32 -
 examples/hadoop_word_count/conf/logback.xml     |  42 +
 examples/hadoop_word_count/src/WordCount.java   |  20 +-
 .../src/WordCountCounters.java                  |  24 +-
 .../hadoop/ColumnFamilyInputFormat.java         |   7 +-
 .../hadoop/ColumnFamilyRecordReader.java        | 117 +--
 .../hadoop/pig/AbstractCassandraStorage.java    | 796 -------------------
 .../cassandra/hadoop/pig/CassandraStorage.java  |  35 +-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  12 +-
 test/conf/logback-test.xml                      |   4 +-
 18 files changed, 218 insertions(+), 976 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 933f5a6..882279f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2
+ * Remove use of Cell in Thrift MapReduce classes (CASSANDRA-8609)
  * Integrate pre-release Java Driver 2.2-rc1, custom build (CASSANDRA-9493)
  * Clean up gossiper logic for old versions (CASSANDRA-9370)
  * Fix custom payload coding/decoding to match the spec (CASSANDRA-9515)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/README.txt b/examples/hadoop_cql3_word_count/README.txt
index b69bdd5..b6ee33f 100644
--- a/examples/hadoop_cql3_word_count/README.txt
+++ b/examples/hadoop_cql3_word_count/README.txt
@@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows,
 and counts them, with RandomPartitioner. The word_count_counters example sums
 the value of counter columns for a key.
 
-The scripts in bin/ assume you are running with cwd of contrib/word_count.
+The scripts in bin/ assume you are running with cwd of examples/word_count.
 
 
 Running
 =======
 
-First build and start a Cassandra server with the default configuration*, 
-then run
+First build and start a Cassandra server with the default configuration*. Ensure that the Thrift
+interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running
+`nodetool enablethrift` after startup.
+Once Cassandra has started and the Thrift interface is available, run
 
 contrib/word_count$ ant
 contrib/word_count$ bin/word_count_setup
@@ -22,14 +24,14 @@ contrib/word_count$ bin/word_count_counters
 In order to view the results in Cassandra, one can use bin/cqlsh and
 perform the following operations:
 $ bin/cqlsh localhost
-> use cql3_worldcount;
+> use cql3_wordcount;
 > select * from output_words;
 
 The output of the word count can now be configured. In the bin/word_count
 file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
 and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
 directories. The cassandra option outputs to the 'output_words' column family
-in the 'cql3_worldcount' keyspace.  'cassandra' is the default.
+in the 'cql3_wordcount' keyspace.  'cassandra' is the default.
 
 Read the code in src/ for more details.
 
@@ -45,5 +47,5 @@ settings accordingly.
 Troubleshooting
 ===============
 
-word_count uses conf/log4j.properties to log to wc.out.
+word_count uses conf/logback.xml to log to wc.out.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
index 0b69b40..52ea2e5 100755
--- a/examples/hadoop_cql3_word_count/bin/word_count_counters
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then
     exit 1
 fi
 
+CLASSPATH=$CLASSPATH:$cwd/../conf
 CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
 CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
 CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/log4j.properties b/examples/hadoop_cql3_word_count/conf/log4j.properties
deleted file mode 100644
index 508d60f..0000000
--- a/examples/hadoop_cql3_word_count/conf/log4j.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=INFO,stdout,F
-
-#stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
-
-# log file
-log4j.appender.F=org.apache.log4j.FileAppender
-log4j.appender.F.Append=false
-log4j.appender.F.layout=org.apache.log4j.PatternLayout
-log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
-# Edit the next line to point to your logs directory
-log4j.appender.F.File=wc.out
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/logback.xml b/examples/hadoop_cql3_word_count/conf/logback.xml
new file mode 100644
index 0000000..443bd1c
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/conf/logback.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration scan="true">
+
+  <jmxConfigurator />
+
+  <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+    <file>wc.out</file>
+    <encoder>
+      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="INFO">
+    <appender-ref ref="FILE" />
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index 3702a2b..bc95736 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -44,7 +44,7 @@ import com.datastax.driver.core.Row;
 
 /**
  * This counts the occurrences of words in ColumnFamily
- *   cql3_worldcount ( id uuid,
+ *   cql3_wordcount ( id uuid,
  *                   line  text,
  *                   PRIMARY KEY (id))
  *
@@ -60,7 +60,7 @@ public class WordCount extends Configured implements Tool
 {
     private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
     static final String INPUT_MAPPER_VAR = "input_mapper";
-    static final String KEYSPACE = "cql3_worldcount";
+    static final String KEYSPACE = "cql3_wordcount";
     static final String COLUMN_FAMILY = "inputs";
 
     static final String OUTPUT_REDUCER_VAR = "output_reducer";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/README.txt b/examples/hadoop_word_count/README.txt
index ec6f512..e336b89 100644
--- a/examples/hadoop_word_count/README.txt
+++ b/examples/hadoop_word_count/README.txt
@@ -5,14 +5,16 @@ WordCount hadoop example: Inserts a bunch of words across multiple rows,
 and counts them, with RandomPartitioner. The word_count_counters example sums
 the value of counter columns for a key.
 
-The scripts in bin/ assume you are running with cwd of contrib/word_count.
+The scripts in bin/ assume you are running with cwd of examples/word_count.
 
 
 Running
 =======
 
-First build and start a Cassandra server with the default configuration*, 
-then run
+First build and start a Cassandra server with the default configuration*. Ensure that the Thrift
+interface is enabled, either by setting start_rpc:true in cassandra.yaml or by running
+`nodetool enablethrift` after startup.
+Once Cassandra has started and the Thrift interface is available, run
 
 contrib/word_count$ ant
 contrib/word_count$ bin/word_count_setup
@@ -45,4 +47,4 @@ settings accordingly.
 Troubleshooting
 ===============
 
-word_count uses conf/log4j.properties to log to wc.out.
+word_count uses conf/logback.xml to log to wc.out.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count_counters b/examples/hadoop_word_count/bin/word_count_counters
index 7793477..58c398c 100755
--- a/examples/hadoop_word_count/bin/word_count_counters
+++ b/examples/hadoop_word_count/bin/word_count_counters
@@ -30,6 +30,7 @@ if [ ! -e $cwd/../build/word_count.jar ]; then
     exit 1
 fi
 
+CLASSPATH=$CLASSPATH:$cwd/../conf
 CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
 CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
 CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/log4j.properties b/examples/hadoop_word_count/conf/log4j.properties
deleted file mode 100644
index 508d60f..0000000
--- a/examples/hadoop_word_count/conf/log4j.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=INFO,stdout,F
-
-#stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
-
-# log file
-log4j.appender.F=org.apache.log4j.FileAppender
-log4j.appender.F.Append=false
-log4j.appender.F.layout=org.apache.log4j.PatternLayout
-log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
-# Edit the next line to point to your logs directory
-log4j.appender.F.File=wc.out
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/conf/logback.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/conf/logback.xml b/examples/hadoop_word_count/conf/logback.xml
new file mode 100644
index 0000000..443bd1c
--- /dev/null
+++ b/examples/hadoop_word_count/conf/logback.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration scan="true">
+
+  <jmxConfigurator />
+
+  <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+    <file>wc.out</file>
+    <encoder>
+      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="INFO">
+    <appender-ref ref="FILE" />
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java
index f6bca77..d092f1f 100644
--- a/examples/hadoop_word_count/src/WordCount.java
+++ b/examples/hadoop_word_count/src/WordCount.java
@@ -20,15 +20,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -71,7 +67,7 @@ public class WordCount extends Configured implements Tool
         System.exit(0);
     }
 
-    public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, IntWritable>
+    public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, IntWritable>
     {
         private final static IntWritable one = new IntWritable(1);
         private Text word = new Text();
@@ -82,17 +78,17 @@ public class WordCount extends Configured implements Tool
         {
         }
 
-        public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException
+        public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException
         {
-            for (Cell cell : columns.values())
+            for (ColumnFamilyRecordReader.Column column : columns.values())
             {
-                String name  = ByteBufferUtil.string(cell.name().toByteBuffer());
+                String name  = ByteBufferUtil.string(column.name);
                 String value = null;
                 
                 if (name.contains("int"))
-                    value = String.valueOf(ByteBufferUtil.toInt(cell.value()));
+                    value = String.valueOf(ByteBufferUtil.toInt(column.value));
                 else
-                    value = ByteBufferUtil.string(cell.value());
+                    value = ByteBufferUtil.string(column.value);
                                
                 logger.debug("read {}:{}={} from {}",
                              new Object[] {ByteBufferUtil.string(key), name, value, context.getInputSplit()});

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/examples/hadoop_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCountCounters.java b/examples/hadoop_word_count/src/WordCountCounters.java
index 39fb778..98c8579 100644
--- a/examples/hadoop_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_word_count/src/WordCountCounters.java
@@ -20,26 +20,26 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.SortedMap;
 
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.thrift.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
 /**
  * This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1".
  *
@@ -60,15 +60,15 @@ public class WordCountCounters extends Configured implements Tool
         System.exit(0);
     }
 
-    public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, Cell>, Text, LongWritable>
+    public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>, Text, LongWritable>
     {
-        public void map(ByteBuffer key, SortedMap<ByteBuffer, Cell> columns, Context context) throws IOException, InterruptedException
+        public void map(ByteBuffer key, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> columns, Context context) throws IOException, InterruptedException
         {
             long sum = 0;
-            for (Cell cell : columns.values())
+            for (ColumnFamilyRecordReader.Column column : columns.values())
             {
-                logger.debug("read " + key + ":" + cell.name() + " from " + context.getInputSplit());
-                sum += ByteBufferUtil.toLong(cell.value());
+                logger.debug("read " + key + ":" + ByteBufferUtil.string(column.name) + " from " + context.getInputSplit());
+                sum += ByteBufferUtil.toLong(column.value);
             }
             context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index f89825f..4662fa5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
@@ -57,7 +56,7 @@ import org.apache.thrift.transport.TTransportException;
  * The default split size is 64k rows.
  */
 @Deprecated
-public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
 
@@ -91,12 +90,12 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
         return client;
     }
 
-    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
 
-    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
     {
         TaskAttemptContext tac = HadoopCompat.newMapContext(
                 jobConf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index c103d75..aee730d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,9 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.TypeParser;
@@ -49,8 +46,8 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 
 @Deprecated
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
-    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
+    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
 
@@ -58,7 +55,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private ColumnFamilySplit split;
     private RowIterator iter;
-    private Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> currentRow;
+    private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
     private SlicePredicate predicate;
     private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
@@ -98,7 +95,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return currentRow.left;
     }
 
-    public SortedMap<ByteBuffer, Cell> getCurrentValue()
+    public SortedMap<ByteBuffer, Column> getCurrentValue()
     {
         return currentRow.right;
     }
@@ -216,7 +213,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return split.getLocations()[0];
     }
 
-    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
+    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
     {
         protected List<KeySlice> rows;
         protected int totalRead = 0;
@@ -283,50 +280,48 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             return totalRead;
         }
 
-        protected List<Cell> unthriftify(ColumnOrSuperColumn cosc)
+        protected List<Pair<ByteBuffer, Column>> unthriftify(ColumnOrSuperColumn cosc)
         {
             if (cosc.counter_column != null)
-                return Collections.<Cell>singletonList(unthriftifyCounter(cosc.counter_column));
+                return Collections.singletonList(unthriftifyCounter(cosc.counter_column));
             if (cosc.counter_super_column != null)
                 return unthriftifySuperCounter(cosc.counter_super_column);
             if (cosc.super_column != null)
                 return unthriftifySuper(cosc.super_column);
             assert cosc.column != null;
-            return Collections.<Cell>singletonList(unthriftifySimple(cosc.column));
+            return Collections.singletonList(unthriftifySimple(cosc.column));
         }
 
-        private List<Cell> unthriftifySuper(SuperColumn super_column)
+        private List<Pair<ByteBuffer, Column>> unthriftifySuper(SuperColumn super_column)
         {
-            List<Cell> cells = new ArrayList<Cell>(super_column.columns.size());
+            List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
             for (org.apache.cassandra.thrift.Column column : super_column.columns)
             {
-                Cell c = unthriftifySimple(column);
-                cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer()))));
+                Pair<ByteBuffer, Column> c = unthriftifySimple(column);
+                columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
             }
-            return cells;
+            return columns;
         }
 
-        protected Cell unthriftifySimple(org.apache.cassandra.thrift.Column column)
+        protected Pair<ByteBuffer, Column> unthriftifySimple(org.apache.cassandra.thrift.Column column)
         {
-            return new BufferCell(CellNames.simpleDense(column.name), column.value, column.timestamp);
+            return Pair.create(column.name, Column.fromRegularColumn(column));
         }
 
-        private Cell unthriftifyCounter(CounterColumn column)
+        private Pair<ByteBuffer, Column> unthriftifyCounter(CounterColumn column)
         {
-            //CounterColumns read the counterID from the System keyspace, so need the StorageService running and access
-            //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Cell.
-            return new BufferCell(CellNames.simpleDense(column.name), ByteBufferUtil.bytes(column.value), 0);
+            return Pair.create(column.name, Column.fromCounterColumn(column));
         }
 
-        private List<Cell> unthriftifySuperCounter(CounterSuperColumn super_column)
+        private List<Pair<ByteBuffer, Column>> unthriftifySuperCounter(CounterSuperColumn super_column)
         {
-            List<Cell> cells = new ArrayList<Cell>(super_column.columns.size());
+            List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
             for (CounterColumn column : super_column.columns)
             {
-                Cell c = unthriftifyCounter(column);
-                cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer()))));
+                Pair<ByteBuffer, Column> c = unthriftifyCounter(column);
+                columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
             }
-            return cells;
+            return columns;
         }
     }
 
@@ -405,7 +400,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
         {
             maybeInit();
             if (rows == null)
@@ -414,12 +409,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             totalRead++;
             KeySlice ks = rows.get(i++);
             AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
-            SortedMap<ByteBuffer, Cell> map = new TreeMap<ByteBuffer, Cell>(comp);
+            SortedMap<ByteBuffer, Column> map = new TreeMap<>(comp);
             for (ColumnOrSuperColumn cosc : ks.columns)
             {
-                List<Cell> cells = unthriftify(cosc);
-                for (Cell cell : cells)
-                    map.put(cell.name().toByteBuffer(), cell);
+                List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
+                for (Pair<ByteBuffer, Column> column : columns)
+                    map.put(column.left, column.right);
             }
             return Pair.create(ks.key, map);
         }
@@ -427,7 +422,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private class WideRowIterator extends RowIterator
     {
-        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> wideColumns;
+        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
         private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
         private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
@@ -476,13 +471,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
         {
             maybeInit();
             if (rows == null)
                 return endOfData();
 
-            Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next = wideColumns.next();
+            Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
             lastColumn = next.right.keySet().iterator().next().duplicate();
 
             maybeIncreaseRowCounter(next);
@@ -494,7 +489,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
          * Increases the row counter only if we really moved to the next row.
          * @param next just fetched row slice
          */
-        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next)
+        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
         {
             ByteBuffer currentKey = next.left;
             if (!currentKey.equals(lastCountedKey))
@@ -504,7 +499,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
+        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
         {
             private final Iterator<KeySlice> rows;
             private Iterator<ColumnOrSuperColumn> columns;
@@ -525,7 +520,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                 columns = currentRow.columns.iterator();
             }
 
-            protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
+            protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
             {
                 AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
                 while (true)
@@ -533,20 +528,20 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                     if (columns.hasNext())
                     {
                         ColumnOrSuperColumn cosc = columns.next();
-                        SortedMap<ByteBuffer, Cell> map;
-                        List<Cell> cells = unthriftify(cosc);
-                        if (cells.size() == 1)
+                        SortedMap<ByteBuffer, Column> map;
+                        List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
+                        if (columns.size() == 1)
                         {
-                            map = ImmutableSortedMap.of(cells.get(0).name().toByteBuffer(), cells.get(0));
+                            map = ImmutableSortedMap.of(columns.get(0).left, columns.get(0).right);
                         }
                         else
                         {
                             assert isSuper;
-                            map = new TreeMap<ByteBuffer, Cell>(comp);
-                            for (Cell cell : cells)
-                                map.put(cell.name().toByteBuffer(), cell);
+                            map = new TreeMap<>(comp);
+                            for (Pair<ByteBuffer, Column> column : columns)
+                                map.put(column.left, column.right);
                         }
-                        return Pair.<ByteBuffer, SortedMap<ByteBuffer, Cell>>create(currentRow.key, map);
+                        return Pair.create(currentRow.key, map);
                     }
 
                     if (!rows.hasNext())
@@ -563,7 +558,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     // to the old. Thus, expect a small performance hit.
     // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
     // and ColumnFamilyRecordReader don't support them, it should be fine for now.
-    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Cell> value) throws IOException
+    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
     {
         if (this.nextKeyValue())
         {
@@ -584,13 +579,37 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return ByteBuffer.wrap(new byte[this.keyBufferSize]);
     }
 
-    public SortedMap<ByteBuffer, Cell> createValue()
+    public SortedMap<ByteBuffer, Column> createValue()
     {
-        return new TreeMap<ByteBuffer, Cell>();
+        return new TreeMap<>();
     }
 
     public long getPos() throws IOException
     {
         return iter.rowsRead();
     }
+
+    public static final class Column
+    {
+        public final ByteBuffer name;
+        public final ByteBuffer value;
+        public final long timestamp;
+
+        private Column(ByteBuffer name, ByteBuffer value, long timestamp)
+        {
+            this.name = name;
+            this.value = value;
+            this.timestamp = timestamp;
+        }
+
+        static Column fromRegularColumn(org.apache.cassandra.thrift.Column input)
+        {
+            return new Column(input.name, input.value, input.timestamp);
+        }
+
+        static Column fromCounterColumn(org.apache.cassandra.thrift.CounterColumn input)
+        {
+            return new Column(input.name, ByteBufferUtil.bytes(input.value), 0);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
deleted file mode 100644
index 263e6c0..0000000
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.pig;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import org.apache.cassandra.transport.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.pig.*;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Cassandra
- */
-public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
-{
-
-    protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
-
-    // system environment variables that can be set to configure connection info:
-    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
-    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
-    public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
-    public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
-    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
-    public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
-    public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
-    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
-    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
-    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
-    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
-    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
-    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
-
-    protected String DEFAULT_INPUT_FORMAT;
-    protected String DEFAULT_OUTPUT_FORMAT;
-
-    public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class);
-
-    protected String username;
-    protected String password;
-    protected String keyspace;
-    protected String column_family;
-    protected String loadSignature;
-    protected String storeSignature;
-
-    protected Configuration conf;
-    protected String inputFormatClass;
-    protected String outputFormatClass;
-    protected int splitSize = 64 * 1024;
-    protected String partitionerClass;
-    protected boolean usePartitionFilter = false;
-    protected String initHostAddress;
-    protected String rpcPort;
-    protected int nativeProtocolVersion = 1;
-
-
-    public AbstractCassandraStorage()
-    {
-        super();
-    }
-
-    /** Deconstructs a composite type to a Tuple. */
-    protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
-    {
-        List<CompositeComponent> result = comparator.deconstruct(name);
-        Tuple t = TupleFactory.getInstance().newTuple(result.size());
-        for (int i=0; i<result.size(); i++)
-            setTupleValue(t, i, cassandraToObj(result.get(i).comparator, result.get(i).value));
-
-        return t;
-    }
-
-    /** convert a column to a tuple */
-    protected Tuple columnToTuple(Cell col, CfInfo cfInfo, AbstractType comparator) throws IOException
-    {
-        CfDef cfDef = cfInfo.cfDef;
-        Tuple pair = TupleFactory.getInstance().newTuple(2);
-
-        ByteBuffer colName = col.name().toByteBuffer();
-
-        // name
-        if(comparator instanceof AbstractCompositeType)
-            setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName));
-        else
-            setTupleValue(pair, 0, cassandraToObj(comparator, colName));
-
-        // value
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
-        {
-            ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(colName);
-            colName = names[names.length-1];
-        }
-        if (validators.get(colName) == null)
-        {
-            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-            setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
-        }
-        else
-            setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
-        return pair;
-    }
-
-    /** set the value to the position of the tuple */
-    protected void setTupleValue(Tuple pair, int position, Object value) throws ExecException
-    {
-       if (value instanceof BigInteger)
-           pair.set(position, ((BigInteger) value).intValue());
-       else if (value instanceof ByteBuffer)
-           pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
-       else if (value instanceof UUID)
-           pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
-       else if (value instanceof Date)
-           pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
-       else
-           pair.set(position, value);
-    }
-
-    /** get the columnfamily definition for the signature */
-    protected CfInfo getCfInfo(String signature) throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        String prop = property.getProperty(signature);
-        CfInfo cfInfo = new CfInfo();
-        cfInfo.cfDef = cfdefFromString(prop.substring(2));
-        cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
-        cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
-        return cfInfo;
-    }
-
-    /** construct a map to store the mashaller type to cassandra data type mapping */
-    protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
-    {
-        Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
-        AbstractType comparator;
-        AbstractType subcomparator;
-        AbstractType default_validator;
-        AbstractType key_validator;
-
-        comparator = parseType(cfDef.getComparator_type());
-        subcomparator = parseType(cfDef.getSubcomparator_type());
-        default_validator = parseType(cfDef.getDefault_validation_class());
-        key_validator = parseType(cfDef.getKey_validation_class());
-
-        marshallers.put(MarshallerType.COMPARATOR, comparator);
-        marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
-        marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
-        marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
-        return marshallers;
-    }
-
-    /** get the validators */
-    protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
-    {
-        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
-        for (ColumnDef cd : cfDef.getColumn_metadata())
-        {
-            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
-            {
-                AbstractType validator = null;
-                try
-                {
-                    validator = TypeParser.parse(cd.getValidation_class());
-                    if (validator instanceof CounterColumnType)
-                        validator = LongType.instance; 
-                    validators.put(cd.name, validator);
-                }
-                catch (ConfigurationException | SyntaxException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-        }
-        return validators;
-    }
-
-    /** parse the string to a cassandra data type */
-    protected AbstractType parseType(String type) throws IOException
-    {
-        try
-        {
-            // always treat counters like longs, specifically CCT.compose is not what we need
-            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
-                    return LongType.instance;
-            return TypeParser.parse(type);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-        catch (SyntaxException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public InputFormat getInputFormat() throws IOException
-    {
-        try
-        {
-            return FBUtilities.construct(inputFormatClass, "inputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** decompose the query to store the parameters in a map */
-    public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException 
-    {
-        String[] params = query.split("&");
-        Map<String, String> map = new HashMap<String, String>(params.length);
-        for (String param : params)
-        {
-            String[] keyValue = param.split("=");
-            map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8"));
-        }
-        return map;
-    }
-
-    /** set hadoop cassandra connection settings */
-    protected void setConnectionInformation() throws IOException
-    {
-        if (System.getenv(PIG_RPC_PORT) != null)
-        {
-            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
-            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
-        }
-
-        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
-            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
-        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
-            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
-
-        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
-        {
-            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-        }
-        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
-            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
-        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
-            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
-
-        if (System.getenv(PIG_PARTITIONER) != null)
-        {
-            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-        }
-        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
-            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
-        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
-            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
-        if (System.getenv(PIG_INPUT_FORMAT) != null)
-            inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
-        else
-            inputFormatClass = DEFAULT_INPUT_FORMAT;
-        if (System.getenv(PIG_OUTPUT_FORMAT) != null)
-            outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
-        else
-            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
-    }
-
-    /** get the full class name */
-    protected String getFullyQualifiedClassName(String classname)
-    {
-        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
-    }
-
-    /** get pig type for the cassandra data type*/
-    protected byte getPigType(AbstractType type)
-    {
-        if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
-            return DataType.LONG;
-        else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
-            return DataType.INTEGER;
-        else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
-            return DataType.CHARARRAY;
-        else if (type instanceof FloatType)
-            return DataType.FLOAT;
-        else if (type instanceof DoubleType)
-            return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
-            return DataType.TUPLE;
-
-        return DataType.BYTEARRAY;
-    }
-
-    public ResourceStatistics getStatistics(String location, Job job)
-    {
-        return null;
-    }
-
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
-    {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature)
-    {
-        this.loadSignature = signature;
-    }
-
-    /** StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        this.storeSignature = signature;
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
-    }
-
-    /** output format */
-    public OutputFormat getOutputFormat() throws IOException
-    {
-        try
-        {
-            return FBUtilities.construct(outputFormatClass, "outputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    protected abstract ByteBuffer nullToBB();
-
-    /** convert object to ByteBuffer */
-    protected ByteBuffer objToBB(Object o)
-    {
-        if (o == null)
-            return nullToBB();
-        if (o instanceof java.lang.String)
-            return ByteBuffer.wrap(new DataByteArray((String)o).get());
-        if (o instanceof Integer)
-            return Int32Type.instance.decompose((Integer)o);
-        if (o instanceof Long)
-            return LongType.instance.decompose((Long)o);
-        if (o instanceof Float)
-            return FloatType.instance.decompose((Float)o);
-        if (o instanceof Double)
-            return DoubleType.instance.decompose((Double)o);
-        if (o instanceof UUID)
-            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
-        if(o instanceof Tuple) {
-            List<Object> objects = ((Tuple)o).getAll();
-            //collections
-            if (objects.size() > 0 && objects.get(0) instanceof String)
-            {
-                String collectionType = (String) objects.get(0);
-                if ("set".equalsIgnoreCase(collectionType) ||
-                        "list".equalsIgnoreCase(collectionType))
-                    return objToListOrSetBB(objects.subList(1, objects.size()));
-                else if ("map".equalsIgnoreCase(collectionType))
-                    return objToMapBB(objects.subList(1, objects.size()));
-                   
-            }
-            return objToCompositeBB(objects);
-        }
-
-        return ByteBuffer.wrap(((DataByteArray) o).get());
-    }
-
-    private ByteBuffer objToListOrSetBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-        for(Object sub : objects)
-        {
-            ByteBuffer buffer = objToBB(sub);
-            serialized.add(buffer);
-        }
-        // NOTE: using protocol v1 serialization format for collections so as to not break
-        // compatibility. Not sure if that's the right thing.
-        return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1);
-    }
-
-    private ByteBuffer objToMapBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
-        for(Object sub : objects)
-        {
-            List<Object> keyValue = ((Tuple)sub).getAll();
-            for (Object entry: keyValue)
-            {
-                ByteBuffer buffer = objToBB(entry);
-                serialized.add(buffer);
-            }
-        } 
-        // NOTE: using protocol v1 serialization format for collections so as to not break
-        // compatibility. Not sure if that's the right thing.
-        return CollectionSerializer.pack(serialized, objects.size(), Server.VERSION_1);
-    }
-
-    private ByteBuffer objToCompositeBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-        int totalLength = 0;
-        for(Object sub : objects)
-        {
-            ByteBuffer buffer = objToBB(sub);
-            serialized.add(buffer);
-            totalLength += 2 + buffer.remaining() + 1;
-        }
-        ByteBuffer out = ByteBuffer.allocate(totalLength);
-        for (ByteBuffer bb : serialized)
-        {
-            int length = bb.remaining();
-            out.put((byte) ((length >> 8) & 0xFF));
-            out.put((byte) (length & 0xFF));
-            out.put(bb);
-            out.put((byte) 0);
-        }
-        out.flip();
-        return out;
-    }
-
-    public void cleanupOnFailure(String failure, Job job)
-    {
-    }
-
-    public void cleanupOnSuccess(String location, Job job) throws IOException {
-    }
-
-
-    /** Methods to get the column family schema from Cassandra */
-    protected void initSchema(String signature) throws IOException
-    {
-        Properties properties = UDFContext.getUDFContext().getUDFProperties(AbstractCassandraStorage.class);
-
-        // Only get the schema if we haven't already gotten it
-        if (!properties.containsKey(signature))
-        {
-            try
-            {
-                Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-                client.set_keyspace(keyspace);
-
-                if (username != null && password != null)
-                {
-                    Map<String, String> credentials = new HashMap<String, String>(2);
-                    credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
-                    credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
-
-                    try
-                    {
-                        client.login(new AuthenticationRequest(credentials));
-                    }
-                    catch (AuthenticationException e)
-                    {
-                        logger.error("Authentication exception: invalid username and/or password");
-                        throw new IOException(e);
-                    }
-                }
-
-                // compose the CfDef for the columfamily
-                CfInfo cfInfo = getCfInfo(client);
-
-                if (cfInfo.cfDef != null)
-                {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
-                    properties.setProperty(signature, sb.toString());
-                }
-                else
-                    throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
-                                                             column_family,
-                                                             keyspace));
-            }
-            catch (Exception e)
-            {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    /** convert CfDef to string */
-    protected static String cfdefToString(CfDef cfDef) throws IOException
-    {
-        assert cfDef != null;
-        // this is so awful it's kind of cool!
-        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
-        try
-        {
-            return Hex.bytesToHex(serializer.serialize(cfDef));
-        }
-        catch (TException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** convert string back to CfDef */
-    protected static CfDef cfdefFromString(String st) throws IOException
-    {
-        assert st != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        CfDef cfDef = new CfDef();
-        try
-        {
-            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
-        }
-        catch (TException e)
-        {
-            throw new IOException(e);
-        }
-        return cfDef;
-    }
-
-    /** return the CfInfo for the column family */
-    protected CfInfo getCfInfo(Cassandra.Client client)
-            throws InvalidRequestException,
-                   UnavailableException,
-                   TimedOutException,
-                   SchemaDisagreementException,
-                   TException,
-                   NotFoundException,
-                   org.apache.cassandra.exceptions.InvalidRequestException,
-                   ConfigurationException,
-                   IOException
-    {
-        // get CF meta data
-        String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                                     SystemKeyspace.NAME,
-                                     LegacySchemaTables.COLUMNFAMILIES,
-                                     keyspace,
-                                     column_family);
-
-        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
-        if (result == null || result.rows == null || result.rows.isEmpty())
-            return null;
-
-        Iterator<CqlRow> iteraRow = result.rows.iterator();
-        CfDef cfDef = new CfDef();
-        cfDef.keyspace = keyspace;
-        cfDef.name = column_family;
-        boolean cql3Table = false;
-        if (iteraRow.hasNext())
-        {
-            CqlRow cqlRow = iteraRow.next();
-
-            cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
-            cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
-            ByteBuffer subComparator = cqlRow.columns.get(2).value;
-            if (subComparator != null)
-                cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
-            cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
-            cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
-            String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value);
-            if (FBUtilities.fromJsonList(keyAliases).size() > 0)
-                cql3Table = true;
-        }
-        cfDef.column_metadata = getColumnMetadata(client);
-        CfInfo cfInfo = new CfInfo();
-        cfInfo.cfDef = cfDef;
-        if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
-            cfInfo.compactCqlTable = true;
-        if (cql3Table)
-            cfInfo.cql3Table = true;; 
-        return cfInfo;
-    }
-
-    /** get a list of columns */
-    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
-            throws InvalidRequestException,
-            UnavailableException,
-            TimedOutException,
-            SchemaDisagreementException,
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException;
-
-    /** get column meta data */
-    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
-            throws InvalidRequestException,
-            UnavailableException,
-            TimedOutException,
-            SchemaDisagreementException,
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException
-    {
-        String query = String.format("SELECT column_name, validator, index_type, type " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                                     SystemKeyspace.NAME,
-                                     LegacySchemaTables.COLUMNS,
-                                     keyspace,
-                                     column_family);
-
-        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
-        List<CqlRow> rows = result.rows;
-        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
-        if (rows == null || rows.isEmpty())
-        {
-            // if CassandraStorage, just return the empty list
-            if (cassandraStorage)
-                return columnDefs;
-
-            // otherwise for CqlNativeStorage, check metadata for classic thrift tables
-            CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
-            for (ColumnDefinition def : cfm.regularAndStaticColumns())
-            {
-                ColumnDef cDef = new ColumnDef();
-                String columnName = def.name.toString();
-                String type = def.type.toString();
-                logger.debug("name: {}, type: {} ", columnName, type);
-                cDef.name = ByteBufferUtil.bytes(columnName);
-                cDef.validation_class = type;
-                columnDefs.add(cDef);
-            }
-            // we may not need to include the value column for compact tables as we 
-            // could have already processed it as schema_columnfamilies.value_alias
-            if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
-            {
-                ColumnDefinition def = cfm.compactValueColumn();
-                if ("value".equals(def.name.toString()))
-                {
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = def.name.bytes;
-                    cDef.validation_class = def.type.toString();
-                    columnDefs.add(cDef);
-                }
-            }
-            return columnDefs;
-        }
-
-        Iterator<CqlRow> iterator = rows.iterator();
-        while (iterator.hasNext())
-        {
-            CqlRow row = iterator.next();
-            ColumnDef cDef = new ColumnDef();
-            String type = ByteBufferUtil.string(row.getColumns().get(3).value);
-            if (!type.equals("regular"))
-                continue;
-            cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
-            cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
-            ByteBuffer indexType = row.getColumns().get(2).value;
-            if (indexType != null)
-                cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
-            columnDefs.add(cDef);
-        }
-        return columnDefs;
-    }
-
-    /** get index type from string */
-    protected IndexType getIndexType(String type)
-    {
-        type = type.toLowerCase();
-        if ("keys".equals(type))
-            return IndexType.KEYS;
-        else if("custom".equals(type))
-            return IndexType.CUSTOM;
-        else if("composites".equals(type))
-            return IndexType.COMPOSITES;
-        else
-            return null;
-    }
-
-    /** return partition keys */
-    public String[] getPartitionKeys(String location, Job job) throws IOException
-    {
-        if (!usePartitionFilter)
-            return null;
-        List<ColumnDef> indexes = getIndexes();
-        String[] partitionKeys = new String[indexes.size()];
-        for (int i = 0; i < indexes.size(); i++)
-        {
-            partitionKeys[i] = new String(indexes.get(i).getName());
-        }
-        return partitionKeys;
-    }
-
-    /** get a list of columns with defined index*/
-    protected List<ColumnDef> getIndexes() throws IOException
-    {
-        CfDef cfdef = getCfInfo(loadSignature).cfDef;
-        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
-        for (ColumnDef cdef : cfdef.column_metadata)
-        {
-            if (cdef.index_type != null)
-                indexes.add(cdef);
-        }
-        return indexes;
-    }
-
-    /** get CFMetaData of a column family */
-    protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
-            throws NotFoundException,
-            InvalidRequestException,
-            TException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException
-    {
-        KsDef ksDef = client.describe_keyspace(ks);
-        for (CfDef cfDef : ksDef.cf_defs)
-        {
-            if (cfDef.name.equalsIgnoreCase(cf))
-                return ThriftConversion.fromThrift(cfDef);
-        }
-        return null;
-    }
-
-    protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
-    {
-        if (validator instanceof DecimalType || validator instanceof InetAddressType)
-            return validator.getString(value);
-
-        if (validator instanceof CollectionType)
-        {
-            // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
-            // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
-            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
-        }
-
-        return validator.compose(value);
-    }
-
-    protected static class CfInfo
-    {
-        boolean compactCqlTable = false;
-        boolean cql3Table = false;
-        CfDef cfDef;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 1ad80b7..5d354a7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -30,12 +30,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.PasswordAuthenticator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.schema.LegacySchemaTables;
@@ -83,7 +83,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private boolean slice_reverse = false;
     private boolean allow_deletes = false;
 
-    private RecordReader<ByteBuffer, Map<ByteBuffer, Cell>> reader;
+    private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader;
     private RecordWriter<ByteBuffer, List<Mutation>> writer;
 
     private boolean widerows = false;
@@ -113,7 +113,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     
     // wide row hacks
     private ByteBuffer lastKey;
-    private Map<ByteBuffer, Cell> lastRow;
+    private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow;
     private boolean hasNext = true;
 
     public CassandraStorage()
@@ -164,7 +164,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                             key = reader.getCurrentKey();
                             tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
                         }
-                        for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+                        for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
@@ -202,7 +202,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                             tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         else
                             addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
-                        for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+                        for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
@@ -216,17 +216,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                     else
                         addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                 }
-                SortedMap<ByteBuffer, Cell> row = (SortedMap<ByteBuffer, Cell>)reader.getCurrentValue();
+                SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row =
+                    (SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue();
                 if (lastRow != null) // prepend what was read last time
                 {
-                    for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
+                    for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
                     {
                         bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
-                for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
+                for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet())
                 {
                     bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                 }
@@ -251,7 +252,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
 
             CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = reader.getCurrentKey();
-            Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
+            Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue();
             assert key != null && cf != null;
 
             // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@ -285,7 +286,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 added.put(cdef.name, true);
             }
             // now add all the other columns
-            for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
+            for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
                     bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -1338,27 +1339,25 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     }
 
     /** convert a column to a tuple */
-    protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException
+    protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
-        ByteBuffer colName = col.name().toByteBuffer();
-
         // name
         if(comparator instanceof AbstractCompositeType)
-            StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName));
+            StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name));
         else
-            StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+            StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion));
 
         // value
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        if (validators.get(colName) == null)
+        if (validators.get(column.name) == null)
         {
             Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion));
+            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion));
         }
         else
-            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion));
+            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion));
         return pair;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 537f30c..dc3c174 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -34,9 +34,6 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -159,9 +156,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
                 ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
                 if (columnValue != null)
                 {
-                    Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue);
                     AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
-                    setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator);
+                    setTupleValue(tuple, i, cqlColumnToObj(ByteBufferUtil.bytes(cdef.getName()), columnValue,
+                                                           tableMetadata), validator);
                 }
                 else
                     tuple.set(i, null);
@@ -176,12 +173,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
     }
 
     /** convert a cql column to an object */
-    private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
+    private Object cqlColumnToObj(ByteBuffer name, ByteBuffer columnValue, TableInfo cfDef) throws IOException
     {
         // standard
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        ByteBuffer cellName = col.name().toByteBuffer();
-        return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion);
+        return StorageHelper.cassandraToObj(validators.get(name), columnValue, nativeProtocolVersion);
     }
 
     /** set the value to the position of the tuple */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/19366575/test/conf/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml
index 8d99aa2..6d75aaf 100644
--- a/test/conf/logback-test.xml
+++ b/test/conf/logback-test.xml
@@ -61,7 +61,9 @@
       <level>WARN</level>
     </filter>
   </appender>
-        
+
+  <logger name="org.apache.hadoop" level="WARN"/>
+
   <root level="DEBUG">
     <appender-ref ref="ASYNCFILE" />
     <appender-ref ref="STDERR" />


Mime
View raw message