cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/3] git commit: Add CqlRecordReader to take advantage of native CQL pagination
Date Wed, 26 Mar 2014 14:38:38 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 917e3872d -> 3d399259d


Add CqlRecordReader to take advantage of native CQL pagination

patch by alexliu68; reviewed by pkolaczk for CASSANDRA-6311


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d7cb9700
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d7cb9700
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d7cb9700

Branch: refs/heads/trunk
Commit: d7cb9700538c6ad9921902f81c081eef77a037b7
Parents: 58e524e
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Mar 26 15:34:59 2014 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Mar 26 15:34:59 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   4 +-
 examples/hadoop_cql3_word_count/bin/word_count  |   3 +-
 .../bin/word_count_counters                     |   4 +-
 .../hadoop_cql3_word_count/src/WordCount.java   |  77 ++-
 .../src/WordCountCounters.java                  |  54 +-
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  | 553 ++++++++++++++++++-
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |  80 +++
 .../hadoop/cql3/CqlPagingRecordReader.java      |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 487 ++++++++++++++++
 10 files changed, 1235 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00b98fa..e971df1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,7 @@
  * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
  * Improve MeteredFlusher handling of MF-unaffected column families
    (CASSANDRA-6867)
+ * Add CqlRecordReader using native pagination (CASSANDRA-6311)
 Merged from 1.2:
  * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
  * add extra SSL cipher suites (CASSANDRA-6613)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 464dece..a15415b 100644
--- a/build.xml
+++ b/build.xml
@@ -380,6 +380,7 @@
           <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
           <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
         </dependencyManagement>
         <developer id="alakshman" name="Avinash Lakshman"/>
@@ -410,7 +411,7 @@
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
         <dependency groupId="org.apache.pig" artifactId="pig"/>
-
+        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
       </artifact:pom>
 
@@ -473,6 +474,7 @@
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
+      	<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
 
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/bin/word_count
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count
index a0c5aa0..974a39a 100644
--- a/examples/hadoop_cql3_word_count/bin/word_count
+++ b/examples/hadoop_cql3_word_count/bin/word_count
@@ -56,6 +56,7 @@ if [ "x$JAVA" = "x" ]; then
 fi
 
 OUTPUT_REDUCER=cassandra
+INPUT_MAPPER=native
 
 #echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER input_mapper=$INPUT_MAPPER

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
index 7793477..0b69b40 100644
--- a/examples/hadoop_cql3_word_count/bin/word_count_counters
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -54,5 +54,7 @@ if [ "x$JAVA" = "x" ]; then
     exit 1
 fi
 
+INPUT_MAPPER=native
+
 #echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters input_mapper=$INPUT_MAPPER

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index bc81a53..519a98f 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
+import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -37,10 +38,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-
+import com.datastax.driver.core.Row;
 import java.nio.charset.CharacterCodingException;
 
 /**
@@ -60,7 +62,7 @@ import java.nio.charset.CharacterCodingException;
 public class WordCount extends Configured implements Tool
 {
     private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
-
+    static final String INPUT_MAPPER_VAR = "input_mapper";
     static final String KEYSPACE = "cql3_worldcount";
     static final String COLUMN_FAMILY = "inputs";
 
@@ -68,7 +70,6 @@ public class WordCount extends Configured implements Tool
     static final String OUTPUT_COLUMN_FAMILY = "output_words";
 
     private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
-
     private static final String PRIMARY_KEY = "row_key";
 
     public static void main(String[] args) throws Exception
@@ -108,6 +109,30 @@ public class WordCount extends Configured implements Tool
         }
     }
 
+    public static class NativeTokenizerMapper extends Mapper<Long, Row, Text, IntWritable>
+    {
+        private final static IntWritable one = new IntWritable(1);
+        private Text word = new Text();
+        private ByteBuffer sourceColumn;
+
+        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
+        throws IOException, InterruptedException
+        {
+        }
+
+        public void map(Long key, Row row, Context context) throws IOException, InterruptedException
+        {
+            String value = row.getString("line");
+            logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()});
+            StringTokenizer itr = new StringTokenizer(value);
+            while (itr.hasMoreTokens())
+            {
+                word.set(itr.nextToken());
+                context.write(word, one);
+            }
+        }
+    }
+
     public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
     {
         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
@@ -149,17 +174,41 @@ public class WordCount extends Configured implements Tool
     public int run(String[] args) throws Exception
     {
         String outputReducerType = "filesystem";
-        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
+        String inputMapperType = "native";
+        String outputReducer = null;
+        String inputMapper = null;
+
+        if (args != null)
         {
-            String[] s = args[0].split("=");
+            if(args[0].startsWith(OUTPUT_REDUCER_VAR))
+                outputReducer = args[0];
+            if(args[0].startsWith(INPUT_MAPPER_VAR))
+                inputMapper = args[0];
+            
+            if (args.length == 2)
+            {
+                if(args[1].startsWith(OUTPUT_REDUCER_VAR))
+                    outputReducer = args[1];
+                if(args[1].startsWith(INPUT_MAPPER_VAR))
+                    inputMapper = args[1]; 
+            }
+        }
+
+        if (outputReducer != null)
+        {
+            String[] s = outputReducer.split("=");
             if (s != null && s.length == 2)
                 outputReducerType = s[1];
         }
         logger.info("output reducer type: " + outputReducerType);
-
+        if (inputMapper != null)
+        {
+            String[] s = inputMapper.split("=");
+            if (s != null && s.length == 2)
+                inputMapperType = s[1];
+        }
         Job job = new Job(getConf(), "wordcount");
         job.setJarByClass(WordCount.class);
-        job.setMapperClass(TokenizerMapper.class);
 
         if (outputReducerType.equalsIgnoreCase("filesystem"))
         {
@@ -189,9 +238,19 @@ public class WordCount extends Configured implements Tool
             ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
         }
 
-        job.setInputFormatClass(CqlPagingInputFormat.class);
+        if (inputMapperType.equalsIgnoreCase("native"))
+        {
+            job.setMapperClass(NativeTokenizerMapper.class);
+            job.setInputFormatClass(CqlInputFormat.class);
+            CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering");
+        }
+        else
+        {
+            job.setMapperClass(TokenizerMapper.class);
+            job.setInputFormatClass(CqlPagingInputFormat.class);
+            ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
+        }
 
-        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
         ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
         ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
         ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
index 542a473..74de9ab 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat;
+import org.apache.cassandra.hadoop.cql3.CqlInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +38,7 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-
+import com.datastax.driver.core.Row;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -51,6 +52,7 @@ public class WordCountCounters extends Configured implements Tool
 {
     private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class);
 
+    static final String INPUT_MAPPER_VAR = "input_mapper";
     static final String COUNTER_COLUMN_FAMILY = "input_words_count";
     private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters";
 
@@ -61,6 +63,24 @@ public class WordCountCounters extends Configured implements Tool
         System.exit(0);
     }
 
+    public static class SumNativeMapper extends Mapper<Long, Row, Text, LongWritable>
+    {
+        long sum = -1;
+        public void map(Long key, Row row, Context context) throws IOException, InterruptedException
+        {   
+            if (sum < 0)
+                sum = 0;
+
+            logger.debug("read " + key + ":count_num from " + context.getInputSplit());
+            sum += Long.valueOf(row.getString("count_num"));
+        }
+
+        protected void cleanup(Context context) throws IOException, InterruptedException {
+            if (sum > 0)
+                context.write(new Text("total_count"), new LongWritable(sum));
+        }
+    }
+
     public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable>
     {
         long sum = -1;
@@ -95,7 +115,6 @@ public class WordCountCounters extends Configured implements Tool
         }
     }
 
-    
     public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable>
     {
         long sum = 0;
@@ -110,25 +129,40 @@ public class WordCountCounters extends Configured implements Tool
 
     public int run(String[] args) throws Exception
     {
+        String inputMapperType = "native";
+        if (args != null && args[0].startsWith(INPUT_MAPPER_VAR))
+        {
+            String[] arg0 = args[0].split("=");
+            if (arg0 != null && arg0.length == 2)
+                inputMapperType = arg0[1];
+        }
         Job job = new Job(getConf(), "wordcountcounters");
-        job.setJarByClass(WordCountCounters.class);
-        job.setMapperClass(SumMapper.class);
 
         job.setCombinerClass(ReducerToFilesystem.class);
         job.setReducerClass(ReducerToFilesystem.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(LongWritable.class);
-        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
-
-        job.setInputFormatClass(CqlPagingInputFormat.class);
+        job.setJarByClass(WordCountCounters.class); 
 
-        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
         ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
         ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
         ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);
 
         CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+        if ("native".equals(inputMapperType))
+        {
+            job.setMapperClass(SumNativeMapper.class);
+            job.setInputFormatClass(CqlInputFormat.class);
+            CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering");
+        }
+        else
+        {
+            job.setMapperClass(SumMapper.class);
+            job.setInputFormatClass(CqlPagingInputFormat.class);
+            ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
+        }
 
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(LongWritable.class);
+        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
         job.waitForCompletion(true);
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index cb61d05..a4a9c44 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -19,13 +19,71 @@ package org.apache.cassandra.hadoop.cql3;
 * under the License.
 *
 */
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
 public class CqlConfigHelper
 {
     private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
+    private static final String INPUT_CQL = "cassandra.input.cql";
+
+    private static final String INPUT_NATIVE_PORT = "cassandra.input.native.port";
+    private static final String INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST = "cassandra.input.native.core.connections.per.host";
+    private static final String INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST = "cassandra.input.native.max.connections.per.host";
+    private static final String INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.min.simult.reqs.per.connection"; 
+    private static final String INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.max.simult.reqs.per.connection";
+    private static final String INPUT_NATIVE_CONNECTION_TIMEOUT = "cassandra.input.native.connection.timeout";
+    private static final String INPUT_NATIVE_READ_CONNECTION_TIMEOUT = "cassandra.input.native.read.connection.timeout";
+    private static final String INPUT_NATIVE_RECEIVE_BUFFER_SIZE = "cassandra.input.native.receive.buffer.size";
+    private static final String INPUT_NATIVE_SEND_BUFFER_SIZE = "cassandra.input.native.send.buffer.size";
+    private static final String INPUT_NATIVE_SOLINGER = "cassandra.input.native.solinger";
+    private static final String INPUT_NATIVE_TCP_NODELAY = "cassandra.input.native.tcp.nodelay";
+    private static final String INPUT_NATIVE_REUSE_ADDRESS = "cassandra.input.native.reuse.address";
+    private static final String INPUT_NATIVE_KEEP_ALIVE = "cassandra.input.native.keep.alive";
+    private static final String INPUT_NATIVE_AUTH_PROVIDER = "cassandra.input.native.auth.provider";
+    private static final String INPUT_NATIVE_SSL_TRUST_STORE_PATH = "cassandra.input.native.ssl.trust.store.path";
+    private static final String INPUT_NATIVE_SSL_KEY_STORE_PATH = "cassandra.input.native.ssl.key.store.path";
+    private static final String INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD = "cassandra.input.native.ssl.trust.store.password";
+    private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
+    private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
+
     private static final String OUTPUT_CQL = "cassandra.output.cql";
 
     /**
@@ -85,25 +143,506 @@ public class CqlConfigHelper
         
         conf.set(OUTPUT_CQL, cql);
     }
-    
-    
+
+    public static void setInputCql(Configuration conf, String cql)
+    {
+        if (cql == null || cql.isEmpty())
+            return;
+
+        conf.set(INPUT_CQL, cql);
+    }
+
+    public static Optional<Integer> getInputCoreConnections(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, conf);
+    }
+
+    public static Optional<Integer> getInputMaxConnections(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, conf);
+    }
+
+    public static int getInputNativePort(Configuration conf)
+    {
+        return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
+    }
+
+    public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
+    }
+
+    public static Optional<Integer> getInputMaxSimultReqPerConnections(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, conf);
+    }
+
+    public static Optional<Integer> getInputNativeConnectionTimeout(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_CONNECTION_TIMEOUT, conf);
+    }
+
+    public static Optional<Integer> getInputNativeReadConnectionTimeout(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, conf);
+    }
+
+    public static Optional<Integer> getInputNativeReceiveBufferSize(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, conf);
+    }
+
+    public static Optional<Integer> getInputNativeSendBufferSize(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_SEND_BUFFER_SIZE, conf);
+    }
+
+    public static Optional<Integer> getInputNativeSolinger(Configuration conf)
+    {
+        return getIntSetting(INPUT_NATIVE_SOLINGER, conf);
+    }
+
+    public static Optional<Boolean> getInputNativeTcpNodelay(Configuration conf)
+    {
+        return getBooleanSetting(INPUT_NATIVE_TCP_NODELAY, conf);
+    }
+
+    public static Optional<Boolean> getInputNativeReuseAddress(Configuration conf)
+    {
+        return getBooleanSetting(INPUT_NATIVE_REUSE_ADDRESS, conf);
+    }
+
+    public static Optional<String> getInputNativeAuthProvider(Configuration conf)
+    {
+        return getStringSetting(INPUT_NATIVE_AUTH_PROVIDER, conf);
+    }
+
+    public static Optional<String> getInputNativeSSLTruststorePath(Configuration conf)
+    {
+        return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PATH, conf);
+    }
+
+    public static Optional<String> getInputNativeSSLKeystorePath(Configuration conf)
+    {
+        return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PATH, conf);
+    }
+
+    public static Optional<String> getInputNativeSSLKeystorePassword(Configuration conf)
+    {
+        return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, conf);
+    }
+
+    public static Optional<String> getInputNativeSSLTruststorePassword(Configuration conf)
+    {
+        return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, conf);
+    }
+
+    public static Optional<String> getInputNativeSSLCipherSuites(Configuration conf)
+    {
+        return getStringSetting(INPUT_NATIVE_SSL_CIPHER_SUITES, conf);
+    }
+
+    public static Optional<Boolean> getInputNativeKeepAlive(Configuration conf)
+    {
+        return getBooleanSetting(INPUT_NATIVE_KEEP_ALIVE, conf);
+    }
+
     public static String getInputcolumns(Configuration conf)
     {
         return conf.get(INPUT_CQL_COLUMNS_CONFIG);
     }
-    
-    public static String getInputPageRowSize(Configuration conf)
+
+    public static Optional<Integer> getInputPageRowSize(Configuration conf)
     {
-        return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG);
+        return getIntSetting(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, conf);
     }
-    
+
     public static String getInputWhereClauses(Configuration conf)
     {
         return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG);
     }
-    
+
+    public static String getInputCql(Configuration conf)
+    {
+        return conf.get(INPUT_CQL);
+    }
+
     public static String getOutputCql(Configuration conf)
     {
         return conf.get(OUTPUT_CQL);
     }
+
+    public static Cluster getInputCluster(String host, Configuration conf)
+    {
+        int port = getInputNativePort(conf);
+        Optional<AuthProvider> authProvider = getAuthProvider(conf);
+        Optional<SSLOptions> sslOptions = getSSLOptions(conf);
+        LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
+        SocketOptions socketOptions = getReadSocketOptions(conf);
+        QueryOptions queryOptions = getReadQueryOptions(conf);
+        PoolingOptions poolingOptions = getReadPoolingOptions(conf);
+        
+        Cluster.Builder builder = Cluster.builder()
+                                         .addContactPoint(host)
+                                         .withPort(port)
+                                         .withCompression(ProtocolOptions.Compression.NONE);
+
+        if (authProvider.isPresent())
+            builder.withAuthProvider(authProvider.get());
+        if (sslOptions.isPresent())
+            builder.withSSL(sslOptions.get());
+
+        builder.withLoadBalancingPolicy(loadBalancingPolicy)
+               .withSocketOptions(socketOptions)
+               .withQueryOptions(queryOptions)
+               .withPoolingOptions(poolingOptions);
+
+        return builder.build();
+    }
+
+    public static void setInputCoreConnections(Configuration conf, String connections)
+    {
+        conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
+    }
+
+    public static void setInputMaxConnections(Configuration conf, String connections)
+    {
+        conf.set(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, connections);
+    }
+
+    public static void setInputMinSimultReqPerConnections(Configuration conf, String reqs)
+    {
+        conf.set(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, reqs);
+    }
+
+    public static void setInputMaxSimultReqPerConnections(Configuration conf, String reqs)
+    {
+        conf.set(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, reqs);
+    }    
+
+    public static void setInputNativeConnectionTimeout(Configuration conf, String timeout)
+    {
+        conf.set(INPUT_NATIVE_CONNECTION_TIMEOUT, timeout);
+    }
+
+    public static void setInputNativeReadConnectionTimeout(Configuration conf, String timeout)
+    {
+        conf.set(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, timeout);
+    }
+
+    public static void setInputNativeReceiveBufferSize(Configuration conf, String size)
+    {
+        conf.set(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, size);
+    }
+
+    public static void setInputNativeSendBufferSize(Configuration conf, String size)
+    {
+        conf.set(INPUT_NATIVE_SEND_BUFFER_SIZE, size);
+    }
+
+    public static void setInputNativeSolinger(Configuration conf, String solinger)
+    {
+        conf.set(INPUT_NATIVE_SOLINGER, solinger);
+    }
+
+    public static void setInputNativeTcpNodelay(Configuration conf, String tcpNodelay)
+    {
+        conf.set(INPUT_NATIVE_TCP_NODELAY, tcpNodelay);
+    }
+
+    public static void setInputNativeAuthProvider(Configuration conf, String authProvider)
+    {
+        conf.set(INPUT_NATIVE_AUTH_PROVIDER, authProvider);
+    }
+
+    public static void setInputNativeSSLTruststorePath(Configuration conf, String authProvider)
+    {
+        conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PATH, authProvider);
+    } 
+
+    public static void setInputNativeSSLKeystorePath(Configuration conf, String authProvider)
+    {
+        conf.set(INPUT_NATIVE_SSL_KEY_STORE_PATH, authProvider);
+    }
+
+    public static void setInputNativeSSLKeystorePassword(Configuration conf, String authProvider)
+    {
+        conf.set(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, authProvider);
+    }
+
+    public static void setInputNativeSSLTruststorePassword(Configuration conf, String authProvider)
+    {
+        conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, authProvider);
+    }
+
+    public static void setInputNativeSSLCipherSuites(Configuration conf, String authProvider)
+    {
+        conf.set(INPUT_NATIVE_SSL_CIPHER_SUITES, authProvider);
+    }
+
+    public static void setInputNativeReuseAddress(Configuration conf, String reuseAddress)
+    {
+        conf.set(INPUT_NATIVE_REUSE_ADDRESS, reuseAddress);
+    }
+
+    public static void setInputNativeKeepAlive(Configuration conf, String keepAlive)
+    {
+        conf.set(INPUT_NATIVE_KEEP_ALIVE, keepAlive);
+    }
+
+    public static void setInputNativePort(Configuration conf, String port)
+    {
+        conf.set(INPUT_NATIVE_PORT, port);
+    }
+
+    private static PoolingOptions getReadPoolingOptions(Configuration conf)
+    {
+        Optional<Integer> coreConnections = getInputCoreConnections(conf);
+        Optional<Integer> maxConnections = getInputMaxConnections(conf);
+        Optional<Integer> maxSimultaneousRequests = getInputMaxSimultReqPerConnections(conf);
+        Optional<Integer> minSimultaneousRequests = getInputMinSimultReqPerConnections(conf);
+        
+        PoolingOptions poolingOptions = new PoolingOptions();
+
+        if (coreConnections.isPresent())
+            poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnections.get());
+        if (maxConnections.isPresent())
+            poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnections.get());
+        if (maxSimultaneousRequests.isPresent())
+            poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, maxSimultaneousRequests.get());
+        if (minSimultaneousRequests.isPresent())
+            poolingOptions.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, minSimultaneousRequests.get());
+
+        poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 0)
+                      .setMaxConnectionsPerHost(HostDistance.REMOTE, 0)
+                      .setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0)
+                      .setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0);
+
+        return poolingOptions;
+    }  
+
+    private static QueryOptions getReadQueryOptions(Configuration conf)
+    {
+        String CL = ConfigHelper.getReadConsistencyLevel(conf);
+        Optional<Integer> fetchSize = getInputPageRowSize(conf);
+        QueryOptions queryOptions = new QueryOptions();
+        if (CL != null && !CL.isEmpty())
+            queryOptions.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(CL));
+
+        if (fetchSize.isPresent())
+            queryOptions.setFetchSize(fetchSize.get());
+        return queryOptions;
+    }
+
+    private static SocketOptions getReadSocketOptions(Configuration conf)
+    {
+        SocketOptions socketOptions = new SocketOptions();
+        Optional<Integer> connectTimeoutMillis = getInputNativeConnectionTimeout(conf);
+        Optional<Integer> readTimeoutMillis = getInputNativeReadConnectionTimeout(conf);
+        Optional<Integer> receiveBufferSize = getInputNativeReceiveBufferSize(conf);
+        Optional<Integer> sendBufferSize = getInputNativeSendBufferSize(conf);
+        Optional<Integer> soLinger = getInputNativeSolinger(conf);
+        Optional<Boolean> tcpNoDelay = getInputNativeTcpNodelay(conf);
+        Optional<Boolean> reuseAddress = getInputNativeReuseAddress(conf);       
+        Optional<Boolean> keepAlive = getInputNativeKeepAlive(conf);
+
+        if (connectTimeoutMillis.isPresent())
+            socketOptions.setConnectTimeoutMillis(connectTimeoutMillis.get());
+        if (readTimeoutMillis.isPresent())
+            socketOptions.setReadTimeoutMillis(readTimeoutMillis.get());
+        if (receiveBufferSize.isPresent())
+            socketOptions.setReceiveBufferSize(receiveBufferSize.get());
+        if (sendBufferSize.isPresent())
+            socketOptions.setSendBufferSize(sendBufferSize.get());
+        if (soLinger.isPresent())
+            socketOptions.setSoLinger(soLinger.get());
+        if (tcpNoDelay.isPresent())
+            socketOptions.setTcpNoDelay(tcpNoDelay.get());
+        if (reuseAddress.isPresent())
+            socketOptions.setReuseAddress(reuseAddress.get());
+        if (keepAlive.isPresent())
+            socketOptions.setKeepAlive(keepAlive.get());     
+
+        return socketOptions;
+    }
+
+    private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost)
+    {
+        return new LoadBalancingPolicy()
+        {
+            private Host origHost;
+            private Set<Host> liveRemoteHosts = Sets.newHashSet();
+
+            @Override
+            public void onAdd(Host host)
+            {
+                if (host.getAddress().getHostName().equals(stickHost))
+                    origHost = host;
+            }
+
+            @Override
+            public void onDown(Host host)
+            {
+                if (host.getAddress().getHostName().equals(stickHost))
+                    origHost = null;
+                liveRemoteHosts.remove(host);
+            }
+
+            @Override
+            public void onRemove(Host host)
+            {
+                if (host.getAddress().getHostName().equals(stickHost))
+                    origHost = null;
+                liveRemoteHosts.remove(host);
+            }
+
+            @Override
+            public void onUp(Host host)
+            {
+                if (host.getAddress().getHostName().equals(stickHost))
+                    origHost = host;
+                liveRemoteHosts.add(host);
+            }
+
+            @Override
+            public HostDistance distance(Host host)
+            {
+                if (host.getAddress().getHostName().equals(stickHost))
+                    return HostDistance.LOCAL;
+                else
+                    return HostDistance.REMOTE;
+            }
+
+            @Override
+            public void init(Cluster cluster, Collection<Host> hosts)
+            {
+                for (Host host : hosts)
+                {
+                    if (host.getAddress().getHostName().equals(stickHost))
+                    {
+                        origHost = host;
+                        break;
+                    }
+                }
+            }
+
+            @Override
+            public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
+            {
+                if (origHost != null)
+                {
+                    return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator());
+                }
+                else
+                {
+                    return liveRemoteHosts.iterator();
+                }
+            }
+        };
+    }
+
+    private static Optional<AuthProvider> getAuthProvider(Configuration conf)
+    {
+        Optional<String> authProvider = getInputNativeAuthProvider(conf);
+        if (!authProvider.isPresent())
+            return Optional.absent();
+
+        return Optional.of(getClientAuthProvider(authProvider.get()));  
+    }
+
+    private static Optional<SSLOptions> getSSLOptions(Configuration conf)
+    {
+        Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf);
+        Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);
+        Optional<String> truststorePassword = getInputNativeSSLTruststorePassword(conf);
+        Optional<String> keystorePassword = getInputNativeSSLKeystorePassword(conf);
+        Optional<String> cipherSuites = getInputNativeSSLCipherSuites(conf);
+        
+        if (truststorePath.isPresent() && keystorePath.isPresent() && truststorePassword.isPresent() && keystorePassword.isPresent())
+        {
+            SSLContext context;
+            try
+            {
+                context = getSSLContext(truststorePath.get(), truststorePassword.get(), keystorePath.get(), keystorePassword.get());
+            }
+            catch (UnrecoverableKeyException | KeyManagementException |
+                    NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            String[] css = SSLOptions.DEFAULT_SSL_CIPHER_SUITES;
+            if (cipherSuites.isPresent())
+                css = cipherSuites.get().split(",");
+            return Optional.of(new SSLOptions(context,css));
+        }
+        return Optional.absent();
+    }
+
+    private static Optional<Integer> getIntSetting(String parameter, Configuration conf)
+    {
+        String setting = conf.get(parameter);
+        if (setting == null)
+            return Optional.absent();
+        return Optional.of(Integer.parseInt(setting));  
+    }
+
+    private static Optional<Boolean> getBooleanSetting(String parameter, Configuration conf)
+    {
+        String setting = conf.get(parameter);
+        if (setting == null)
+            return Optional.absent();
+        return Optional.of(Boolean.parseBoolean(setting));  
+    }
+
+    private static Optional<String> getStringSetting(String parameter, Configuration conf)
+    {
+        String setting = conf.get(parameter);
+        if (setting == null)
+            return Optional.absent();
+        return Optional.of(setting);  
+    }
+
+    private static AuthProvider getClientAuthProvider(String factoryClassName)
+    {
+        try
+        {
+            return (AuthProvider) Class.forName(factoryClassName).newInstance();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Failed to instantiate auth provider:" + factoryClassName, e);
+        }
+    }
+
+    private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword)
+            throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException
+    {
+        FileInputStream tsf = null;
+        FileInputStream ksf = null;
+        SSLContext ctx = null;
+        try
+        {
+            tsf = new FileInputStream(truststorePath);
+            ksf = new FileInputStream(keystorePath);
+            ctx = SSLContext.getInstance("SSL");
+
+            KeyStore ts = KeyStore.getInstance("JKS");
+            ts.load(tsf, truststorePassword.toCharArray());
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+            tmf.init(ts);
+
+            KeyStore ks = KeyStore.getInstance("JKS");
+            ks.load(ksf, keystorePassword.toCharArray());
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+            kmf.init(ks, keystorePassword.toCharArray());
+
+            ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
+        }
+        finally
+        {
+            FileUtils.closeQuietly(tsf);
+            FileUtils.closeQuietly(ksf);
+        }
+        return ctx;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
new file mode 100644
index 0000000..e1cdf32
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import com.datastax.driver.core.Row;
+
+/**
+ * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
+ *
+ * At minimum, you need to set the KS and CF in your Hadoop job Configuration.  
+ * The ConfigHelper class is provided to make this
+ * simple:
+ *   ConfigHelper.setInputColumnFamily
+ *
+ * You can also configure the number of rows per InputSplit with
+ *   ConfigHelper.setInputSplitSize. The default split size is 64k rows.
+ *
+ *   the number of CQL rows per page
+ *   CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You 
+ *   should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL 
+ *   query, so you need set it big enough to minimize the network overhead, and also
+ *   not too big to avoid out of memory issue.
+ *   
+ *   other native protocol connection parameters in CqlConfigHelper
+ */
+public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row>
+{
+    public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
+            throws IOException
+    {
+        TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
+        {
+            @Override
+            public void progress()
+            {
+                reporter.progress();
+            }
+        };
+
+        CqlRecordReader recordReader = new CqlRecordReader();
+        recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
+        return recordReader;
+    }
+
+    @Override
+    public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader(
+            org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+            InterruptedException
+    {
+        return new CqlRecordReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index cee4b4b..b692280 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -117,7 +117,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>,
 
         try
         {
-            pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf));
+            pageRowSize = CqlConfigHelper.getInputPageRowSize(conf).get();
         }
         catch (NumberFormatException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
new file mode 100644
index 0000000..67f76cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+/**
+ * CqlRecordReader reads the rows return from the CQL query
+ * It uses CQL auto-paging.
+ * <p/>
+ * Return a Long as a local CQL row key starts from 0;
+ * <p/>
+ * Row as C* java driver CQL result set row
+ * 1) select clause must include partition key columns (to calculate the progress based on the actual CF row processed)
+ * 2) where clause must include token(partition_key1, ...  , partition_keyn) > ? and 
+ *       token(partition_key1, ... , partition_keyn) <= ?  (in the right order) 
+ */
+public class CqlRecordReader extends RecordReader<Long, Row>
+        implements org.apache.hadoop.mapred.RecordReader<Long, Row>
+{
+    private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
+
+    private ColumnFamilySplit split;
+    private RowIterator rowIterator;
+
+    private Pair<Long, Row> currentRow;
+    private int totalRowCount; // total number of rows to fetch
+    private String keyspace;
+    private String cfName;
+    private String cqlQuery;
+    private Cluster cluster;
+    private Session session;
+    private IPartitioner partitioner;
+
+    // partition keys -- key aliases
+    private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
+
+    public CqlRecordReader()
+    {
+        super();
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
+    {
+        this.split = (ColumnFamilySplit) split;
+        Configuration conf = context.getConfiguration();
+        totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
+                      ? (int) this.split.getLength()
+                      : ConfigHelper.getInputSplitSize(conf);
+        cfName = ConfigHelper.getInputColumnFamily(conf);
+        keyspace = ConfigHelper.getInputKeyspace(conf);              
+        cqlQuery = CqlConfigHelper.getInputCql(conf);
+        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+        try
+        {
+            if (cluster != null)
+                return;
+
+            // create connection using thrift
+            String[] locations = split.getLocations();
+            Exception lastException = null;
+            for (String location : locations)
+            {
+                try
+                {
+                    cluster = CqlConfigHelper.getInputCluster(location, conf);
+                    break;
+                }
+                catch (Exception e)
+                {
+                    lastException = e;
+                    logger.warn("Failed to create authenticated client to {}", location);
+                }
+            }
+            if (cluster == null && lastException != null)
+                throw lastException;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        if (cluster != null)
+            session = cluster.connect(keyspace);
+        rowIterator = new RowIterator();
+        logger.debug("created {}", rowIterator);
+    }
+
+    public void close()
+    {
+        if (session != null)
+            session.close();
+    }
+
+    public Long getCurrentKey()
+    {
+        return currentRow.left;
+    }
+
+    public Row getCurrentValue()
+    {
+        return currentRow.right;
+    }
+
+    public float getProgress()
+    {
+        if (!rowIterator.hasNext())
+            return 1.0F;
+
+        // the progress is likely to be reported slightly off the actual but close enough
+        float progress = ((float) rowIterator.totalRead / totalRowCount);
+        return progress > 1.0F ? 1.0F : progress;
+    }
+
+    public boolean nextKeyValue() throws IOException
+    {
+        if (!rowIterator.hasNext())
+        {
+            logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount);
+            return false;
+        }
+
+        try
+        {
+            currentRow = rowIterator.next();
+        }
+        catch (Exception e)
+        {
+            // throw it as IOException, so client can catch it and handle it at client side
+            IOException ioe = new IOException(e.getMessage());
+            ioe.initCause(ioe.getCause());
+            throw ioe;
+        }
+        return true;
+    }
+
+    // Because the old Hadoop API wants us to write to the key and value
+    // and the new asks for them, we need to copy the output of the new API
+    // to the old. Thus, expect a small performance hit.
+    // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
+    // and ColumnFamilyRecordReader don't support them, it should be fine for now.
+    public boolean next(Long key, Row value) throws IOException
+    {
+        if (nextKeyValue())
+        {
+            ((WrappedRow)value).setRow(getCurrentValue());
+            return true;
+        }
+        return false;
+    }
+
+    public long getPos() throws IOException
+    {
+        return (long) rowIterator.totalRead;
+    }
+
+    public Long createKey()
+    {
+        return new Long(0L);
+    }
+
+    public Row createValue()
+    {
+        return new WrappedRow();
+    }
+
+    /** CQL row iterator 
+     *  Input cql query  
+     *  1) select clause must include key columns (if we use partition key based row count)
+     *  2) where clause must include token(partition_key1 ... partition_keyn) > ? and 
+     *     token(partition_key1 ... partition_keyn) <= ? 
+     */
+    private class RowIterator extends AbstractIterator<Pair<Long, Row>>
+    {
+        private long keyId = 0L;
+        protected int totalRead = 0; // total number of cf rows read
+        protected Iterator<Row> rows;
+        private Map<String, ByteBuffer> previousRowKey = new HashMap<String, ByteBuffer>(); // previous CF row key
+
+        public RowIterator()
+        {
+            if (session == null)
+                throw new RuntimeException("Can't create connection session");
+
+            AbstractType type = partitioner.getTokenValidator();
+            ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
+            for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
+                partitionBoundColumns.put(meta.getName(), Boolean.TRUE);
+            rows = rs.iterator();
+        }
+
+        protected Pair<Long, Row> computeNext()
+        {
+            if (rows == null || !rows.hasNext())
+                return endOfData();
+
+            Row row = rows.next();
+            Map<String, ByteBuffer> keyColumns = new HashMap<String, ByteBuffer>(); 
+            for (String column : partitionBoundColumns.keySet())
+                keyColumns.put(column, row.getBytesUnsafe(column));
+
+            // increase total CF row read
+            if (previousRowKey.isEmpty() && !keyColumns.isEmpty())
+            {
+                previousRowKey = keyColumns;
+                totalRead++;
+            }
+            else
+            {
+                for (String column : partitionBoundColumns.keySet())
+                {
+                    if (BytesType.bytesCompare(keyColumns.get(column), previousRowKey.get(column)) != 0)
+                    {
+                        previousRowKey = keyColumns;
+                        totalRead++;
+                        break;
+                    }
+                }
+            }
+            keyId ++;
+            return Pair.create(keyId, row);
+        }
+    }
+
+    private class WrappedRow implements Row
+    {
+        private Row row;
+
+        public void setRow(Row row)
+        {
+            this.row = row;
+        }
+
+        @Override
+        public ColumnDefinitions getColumnDefinitions()
+        {
+            return row.getColumnDefinitions();
+        }
+
+        @Override
+        public boolean isNull(int i)
+        {
+            return row.isNull(i);
+        }
+
+        @Override
+        public boolean isNull(String name)
+        {
+            return row.isNull(name);
+        }
+
+        @Override
+        public boolean getBool(int i)
+        {
+            return row.getBool(i);
+        }
+
+        @Override
+        public boolean getBool(String name)
+        {
+            return row.getBool(name);
+        }
+
+        @Override
+        public int getInt(int i)
+        {
+            return row.getInt(i);
+        }
+
+        @Override
+        public int getInt(String name)
+        {
+            return row.getInt(name);
+        }
+
+        @Override
+        public long getLong(int i)
+        {
+            return row.getLong(i);
+        }
+
+        @Override
+        public long getLong(String name)
+        {
+            return row.getLong(name);
+        }
+
+        @Override
+        public Date getDate(int i)
+        {
+            return row.getDate(i);
+        }
+
+        @Override
+        public Date getDate(String name)
+        {
+            return row.getDate(name);
+        }
+
+        @Override
+        public float getFloat(int i)
+        {
+            return row.getFloat(i);
+        }
+
+        @Override
+        public float getFloat(String name)
+        {
+            return row.getFloat(name);
+        }
+
+        @Override
+        public double getDouble(int i)
+        {
+            return row.getDouble(i);
+        }
+
+        @Override
+        public double getDouble(String name)
+        {
+            return row.getDouble(name);
+        }
+
+        @Override
+        public ByteBuffer getBytesUnsafe(int i)
+        {
+            return row.getBytesUnsafe(i);
+        }
+
+        @Override
+        public ByteBuffer getBytesUnsafe(String name)
+        {
+            return row.getBytesUnsafe(name);
+        }
+
+        @Override
+        public ByteBuffer getBytes(int i)
+        {
+            return row.getBytes(i);
+        }
+
+        @Override
+        public ByteBuffer getBytes(String name)
+        {
+            return row.getBytes(name);
+        }
+
+        @Override
+        public String getString(int i)
+        {
+            return row.getString(i);
+        }
+
+        @Override
+        public String getString(String name)
+        {
+            return row.getString(name);
+        }
+
+        @Override
+        public BigInteger getVarint(int i)
+        {
+            return row.getVarint(i);
+        }
+
+        @Override
+        public BigInteger getVarint(String name)
+        {
+            return row.getVarint(name);
+        }
+
+        @Override
+        public BigDecimal getDecimal(int i)
+        {
+            return row.getDecimal(i);
+        }
+
+        @Override
+        public BigDecimal getDecimal(String name)
+        {
+            return row.getDecimal(name);
+        }
+
+        @Override
+        public UUID getUUID(int i)
+        {
+            return row.getUUID(i);
+        }
+
+        @Override
+        public UUID getUUID(String name)
+        {
+            return row.getUUID(name);
+        }
+
+        @Override
+        public InetAddress getInet(int i)
+        {
+            return row.getInet(i);
+        }
+
+        @Override
+        public InetAddress getInet(String name)
+        {
+            return row.getInet(name);
+        }
+
+        @Override
+        public <T> List<T> getList(int i, Class<T> elementsClass)
+        {
+            return row.getList(i, elementsClass);
+        }
+
+        @Override
+        public <T> List<T> getList(String name, Class<T> elementsClass)
+        {
+            return row.getList(name, elementsClass);
+        }
+
+        @Override
+        public <T> Set<T> getSet(int i, Class<T> elementsClass)
+        {
+            return row.getSet(i, elementsClass);
+        }
+
+        @Override
+        public <T> Set<T> getSet(String name, Class<T> elementsClass)
+        {
+            return row.getSet(name, elementsClass);
+        }
+
+        @Override
+        public <K, V> Map<K, V> getMap(int i, Class<K> keysClass, Class<V> valuesClass)
+        {
+            return row.getMap(i, keysClass, valuesClass);
+        }
+
+        @Override
+        public <K, V> Map<K, V> getMap(String name, Class<K> keysClass, Class<V> valuesClass)
+        {
+            return row.getMap(name, keysClass, valuesClass);
+        }
+    }
+}


Mime
View raw message