Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61E6C1087E for ; Wed, 26 Mar 2014 14:36:13 +0000 (UTC) Received: (qmail 44017 invoked by uid 500); 26 Mar 2014 14:36:13 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 43804 invoked by uid 500); 26 Mar 2014 14:36:11 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 43791 invoked by uid 99); 26 Mar 2014 14:36:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Mar 2014 14:36:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D6430949AF5; Wed, 26 Mar 2014 14:36:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Message-Id: <92ea03b9a4c343b78023893b31d96d84@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Add CqlRecordReader to take advantage of native CQL pagination Date: Wed, 26 Mar 2014 14:36:07 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 58e524e77 -> d7cb97005 Add CqlRecordReader to take advantage of native CQL pagination patch by alexliu68; reviewed by pkolaczk for CASSANDRA-6311 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d7cb9700 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d7cb9700 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d7cb9700 Branch: refs/heads/cassandra-2.0 Commit: d7cb9700538c6ad9921902f81c081eef77a037b7 Parents: 58e524e Author: Sylvain Lebresne Authored: Wed Mar 26 15:34:59 2014 +0100 Committer: Sylvain Lebresne Committed: Wed Mar 26 15:34:59 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 4 +- examples/hadoop_cql3_word_count/bin/word_count | 3 +- .../bin/word_count_counters | 4 +- .../hadoop_cql3_word_count/src/WordCount.java | 77 ++- .../src/WordCountCounters.java | 54 +- .../cassandra/hadoop/cql3/CqlConfigHelper.java | 553 ++++++++++++++++++- .../cassandra/hadoop/cql3/CqlInputFormat.java | 80 +++ .../hadoop/cql3/CqlPagingRecordReader.java | 2 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 487 ++++++++++++++++ 10 files changed, 1235 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 00b98fa..e971df1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,6 +31,7 @@ * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923) * Improve MeteredFlusher handling of MF-unaffected column families (CASSANDRA-6867) + * Add CqlRecordReader using native pagination (CASSANDRA-6311) Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) * add extra SSL cipher suites (CASSANDRA-6613) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 464dece..a15415b 100644 --- a/build.xml +++ b/build.xml @@ -380,6 +380,7 @@ + @@ -410,7 +411,7 @@ - + @@ -473,6 +474,7 @@ + http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/bin/word_count ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count index a0c5aa0..974a39a 100644 --- a/examples/hadoop_cql3_word_count/bin/word_count +++ b/examples/hadoop_cql3_word_count/bin/word_count @@ -56,6 +56,7 @@ if [ "x$JAVA" = "x" ]; then fi OUTPUT_REDUCER=cassandra +INPUT_MAPPER=native #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER input_mapper=$INPUT_MAPPER http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters index 7793477..0b69b40 100644 --- a/examples/hadoop_cql3_word_count/bin/word_count_counters +++ b/examples/hadoop_cql3_word_count/bin/word_count_counters @@ -54,5 +54,7 @@ if [ "x$JAVA" = "x" ]; then exit 1 fi +INPUT_MAPPER=native + #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters input_mapper=$INPUT_MAPPER http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java index bc81a53..519a98f 100644 --- a/examples/hadoop_cql3_word_count/src/WordCount.java +++ b/examples/hadoop_cql3_word_count/src/WordCount.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; +import org.apache.cassandra.hadoop.cql3.CqlInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; @@ -37,10 +38,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; - +import com.datastax.driver.core.Row; import java.nio.charset.CharacterCodingException; /** @@ -60,7 +62,7 @@ import java.nio.charset.CharacterCodingException; public class WordCount extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCount.class); - + static final String INPUT_MAPPER_VAR = "input_mapper"; static final String KEYSPACE = "cql3_worldcount"; static final String COLUMN_FAMILY = "inputs"; @@ -68,7 +70,6 @@ public class WordCount extends Configured implements Tool static final String OUTPUT_COLUMN_FAMILY = "output_words"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; - private static final String PRIMARY_KEY = "row_key"; public static void main(String[] args) throws Exception @@ -108,6 +109,30 @@ public class WordCount extends Configured implements Tool } } + public static class NativeTokenizerMapper extends Mapper + { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + private ByteBuffer sourceColumn; + + protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) + throws IOException, InterruptedException + { + } + + public void map(Long key, Row row, Context context) throws IOException, InterruptedException + { + String value = row.getString("line"); + logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()}); + StringTokenizer itr = new StringTokenizer(value); + while (itr.hasMoreTokens()) + { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + public static class ReducerToFilesystem extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException @@ -149,17 +174,41 @@ public class WordCount extends Configured implements Tool public int run(String[] args) throws Exception { String outputReducerType = "filesystem"; - if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) + String inputMapperType = "native"; + String outputReducer = null; + String inputMapper = null; + + if (args != null) { - String[] s = args[0].split("="); + if(args[0].startsWith(OUTPUT_REDUCER_VAR)) + outputReducer = args[0]; + if(args[0].startsWith(INPUT_MAPPER_VAR)) + inputMapper = args[0]; + + if (args.length == 2) + { + if(args[1].startsWith(OUTPUT_REDUCER_VAR)) + outputReducer = args[1]; + if(args[1].startsWith(INPUT_MAPPER_VAR)) + inputMapper = args[1]; + } + } + + if (outputReducer != null) + { + String[] s = outputReducer.split("="); if (s != null && s.length == 2) outputReducerType = s[1]; } logger.info("output reducer type: " + outputReducerType); - + if (inputMapper != null) + { + String[] s = inputMapper.split("="); + if (s != null && s.length == 2) + inputMapperType = s[1]; + } Job job = new Job(getConf(), "wordcount"); job.setJarByClass(WordCount.class); - job.setMapperClass(TokenizerMapper.class); if (outputReducerType.equalsIgnoreCase("filesystem")) { @@ -189,9 +238,19 @@ public class WordCount extends Configured implements Tool ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); } - job.setInputFormatClass(CqlPagingInputFormat.class); + if (inputMapperType.equalsIgnoreCase("native")) + { + job.setMapperClass(NativeTokenizerMapper.class); + job.setInputFormatClass(CqlInputFormat.class); + CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering"); + } + else + { + job.setMapperClass(TokenizerMapper.class); + job.setInputFormatClass(CqlPagingInputFormat.class); + ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); + } - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/examples/hadoop_cql3_word_count/src/WordCountCounters.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java index 542a473..74de9ab 100644 --- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java +++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; +import org.apache.cassandra.hadoop.cql3.CqlInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -37,7 +38,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; - +import com.datastax.driver.core.Row; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.utils.ByteBufferUtil; @@ -51,6 +52,7 @@ public class WordCountCounters extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class); + static final String INPUT_MAPPER_VAR = "input_mapper"; static final String COUNTER_COLUMN_FAMILY = "input_words_count"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters"; @@ -61,6 +63,24 @@ public class WordCountCounters extends Configured implements Tool System.exit(0); } + public static class SumNativeMapper extends Mapper + { + long sum = -1; + public void map(Long key, Row row, Context context) throws IOException, InterruptedException + { + if (sum < 0) + sum = 0; + + logger.debug("read " + key + ":count_num from " + context.getInputSplit()); + sum += Long.valueOf(row.getString("count_num")); + } + + protected void cleanup(Context context) throws IOException, InterruptedException { + if (sum > 0) + context.write(new Text("total_count"), new LongWritable(sum)); + } + } + public static class SumMapper extends Mapper, Map, Text, LongWritable> { long sum = -1; @@ -95,7 +115,6 @@ public class WordCountCounters extends Configured implements Tool } } - public static class ReducerToFilesystem extends Reducer { long sum = 0; @@ -110,25 +129,40 @@ public class WordCountCounters extends Configured implements Tool public int run(String[] args) throws Exception { + String inputMapperType = "native"; + if (args != null && args[0].startsWith(INPUT_MAPPER_VAR)) + { + String[] arg0 = args[0].split("="); + if (arg0 != null && arg0.length == 2) + inputMapperType = arg0[1]; + } Job job = new Job(getConf(), "wordcountcounters"); - job.setJarByClass(WordCountCounters.class); - job.setMapperClass(SumMapper.class); job.setCombinerClass(ReducerToFilesystem.class); job.setReducerClass(ReducerToFilesystem.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); - - job.setInputFormatClass(CqlPagingInputFormat.class); + job.setJarByClass(WordCountCounters.class); - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY); CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); + if ("native".equals(inputMapperType)) + { + job.setMapperClass(SumNativeMapper.class); + job.setInputFormatClass(CqlInputFormat.class); + CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering"); + } + else + { + job.setMapperClass(SumMapper.class); + job.setInputFormatClass(CqlPagingInputFormat.class); + ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); + } + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); job.waitForCompletion(true); return 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index cb61d05..a4a9c44 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -19,13 +19,71 @@ package org.apache.cassandra.hadoop.cql3; * under the License. * */ +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.io.util.FileUtils; import org.apache.hadoop.conf.Configuration; +import com.datastax.driver.core.AuthProvider; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.google.common.base.Optional; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; + public class CqlConfigHelper { private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon , private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size"; private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause"; + private static final String INPUT_CQL = "cassandra.input.cql"; + + private static final String INPUT_NATIVE_PORT = "cassandra.input.native.port"; + private static final String INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST = "cassandra.input.native.core.connections.per.host"; + private static final String INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST = "cassandra.input.native.max.connections.per.host"; + private static final String INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.min.simult.reqs.per.connection"; + private static final String INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.max.simult.reqs.per.connection"; + private static final String INPUT_NATIVE_CONNECTION_TIMEOUT = "cassandra.input.native.connection.timeout"; + private static final String INPUT_NATIVE_READ_CONNECTION_TIMEOUT = "cassandra.input.native.read.connection.timeout"; + private static final String INPUT_NATIVE_RECEIVE_BUFFER_SIZE = "cassandra.input.native.receive.buffer.size"; + private static final String INPUT_NATIVE_SEND_BUFFER_SIZE = "cassandra.input.native.send.buffer.size"; + private static final String INPUT_NATIVE_SOLINGER = "cassandra.input.native.solinger"; + private static final String INPUT_NATIVE_TCP_NODELAY = "cassandra.input.native.tcp.nodelay"; + private static final String INPUT_NATIVE_REUSE_ADDRESS = "cassandra.input.native.reuse.address"; + private static final String INPUT_NATIVE_KEEP_ALIVE = "cassandra.input.native.keep.alive"; + private static final String INPUT_NATIVE_AUTH_PROVIDER = "cassandra.input.native.auth.provider"; + private static final String INPUT_NATIVE_SSL_TRUST_STORE_PATH = "cassandra.input.native.ssl.trust.store.path"; + private static final String INPUT_NATIVE_SSL_KEY_STORE_PATH = "cassandra.input.native.ssl.key.store.path"; + private static final String INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD = "cassandra.input.native.ssl.trust.store.password"; + private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password"; + private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites"; + private static final String OUTPUT_CQL = "cassandra.output.cql"; /** @@ -85,25 +143,506 @@ public class CqlConfigHelper conf.set(OUTPUT_CQL, cql); } - - + + public static void setInputCql(Configuration conf, String cql) + { + if (cql == null || cql.isEmpty()) + return; + + conf.set(INPUT_CQL, cql); + } + + public static Optional getInputCoreConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, conf); + } + + public static Optional getInputMaxConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, conf); + } + + public static int getInputNativePort(Configuration conf) + { + return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042")); + } + + public static Optional getInputMinSimultReqPerConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf); + } + + public static Optional getInputMaxSimultReqPerConnections(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, conf); + } + + public static Optional getInputNativeConnectionTimeout(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_CONNECTION_TIMEOUT, conf); + } + + public static Optional getInputNativeReadConnectionTimeout(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, conf); + } + + public static Optional getInputNativeReceiveBufferSize(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, conf); + } + + public static Optional getInputNativeSendBufferSize(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_SEND_BUFFER_SIZE, conf); + } + + public static Optional getInputNativeSolinger(Configuration conf) + { + return getIntSetting(INPUT_NATIVE_SOLINGER, conf); + } + + public static Optional getInputNativeTcpNodelay(Configuration conf) + { + return getBooleanSetting(INPUT_NATIVE_TCP_NODELAY, conf); + } + + public static Optional getInputNativeReuseAddress(Configuration conf) + { + return getBooleanSetting(INPUT_NATIVE_REUSE_ADDRESS, conf); + } + + public static Optional getInputNativeAuthProvider(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_AUTH_PROVIDER, conf); + } + + public static Optional getInputNativeSSLTruststorePath(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PATH, conf); + } + + public static Optional getInputNativeSSLKeystorePath(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PATH, conf); + } + + public static Optional getInputNativeSSLKeystorePassword(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, conf); + } + + public static Optional getInputNativeSSLTruststorePassword(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, conf); + } + + public static Optional getInputNativeSSLCipherSuites(Configuration conf) + { + return getStringSetting(INPUT_NATIVE_SSL_CIPHER_SUITES, conf); + } + + public static Optional getInputNativeKeepAlive(Configuration conf) + { + return getBooleanSetting(INPUT_NATIVE_KEEP_ALIVE, conf); + } + public static String getInputcolumns(Configuration conf) { return conf.get(INPUT_CQL_COLUMNS_CONFIG); } - - public static String getInputPageRowSize(Configuration conf) + + public static Optional getInputPageRowSize(Configuration conf) { - return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG); + return getIntSetting(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, conf); } - + public static String getInputWhereClauses(Configuration conf) { return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG); } - + + public static String getInputCql(Configuration conf) + { + return conf.get(INPUT_CQL); + } + public static String getOutputCql(Configuration conf) { return conf.get(OUTPUT_CQL); } + + public static Cluster getInputCluster(String host, Configuration conf) + { + int port = getInputNativePort(conf); + Optional authProvider = getAuthProvider(conf); + Optional sslOptions = getSSLOptions(conf); + LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host); + SocketOptions socketOptions = getReadSocketOptions(conf); + QueryOptions queryOptions = getReadQueryOptions(conf); + PoolingOptions poolingOptions = getReadPoolingOptions(conf); + + Cluster.Builder builder = Cluster.builder() + .addContactPoint(host) + .withPort(port) + .withCompression(ProtocolOptions.Compression.NONE); + + if (authProvider.isPresent()) + builder.withAuthProvider(authProvider.get()); + if (sslOptions.isPresent()) + builder.withSSL(sslOptions.get()); + + builder.withLoadBalancingPolicy(loadBalancingPolicy) + .withSocketOptions(socketOptions) + .withQueryOptions(queryOptions) + .withPoolingOptions(poolingOptions); + + return builder.build(); + } + + public static void setInputCoreConnections(Configuration conf, String connections) + { + conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections); + } + + public static void setInputMaxConnections(Configuration conf, String connections) + { + conf.set(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, connections); + } + + public static void setInputMinSimultReqPerConnections(Configuration conf, String reqs) + { + conf.set(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, reqs); + } + + public static void setInputMaxSimultReqPerConnections(Configuration conf, String reqs) + { + conf.set(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, reqs); + } + + public static void setInputNativeConnectionTimeout(Configuration conf, String timeout) + { + conf.set(INPUT_NATIVE_CONNECTION_TIMEOUT, timeout); + } + + public static void setInputNativeReadConnectionTimeout(Configuration conf, String timeout) + { + conf.set(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, timeout); + } + + public static void setInputNativeReceiveBufferSize(Configuration conf, String size) + { + conf.set(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, size); + } + + public static void setInputNativeSendBufferSize(Configuration conf, String size) + { + conf.set(INPUT_NATIVE_SEND_BUFFER_SIZE, size); + } + + public static void setInputNativeSolinger(Configuration conf, String solinger) + { + conf.set(INPUT_NATIVE_SOLINGER, solinger); + } + + public static void setInputNativeTcpNodelay(Configuration conf, String tcpNodelay) + { + conf.set(INPUT_NATIVE_TCP_NODELAY, tcpNodelay); + } + + public static void setInputNativeAuthProvider(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_AUTH_PROVIDER, authProvider); + } + + public static void setInputNativeSSLTruststorePath(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PATH, authProvider); + } + + public static void setInputNativeSSLKeystorePath(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_KEY_STORE_PATH, authProvider); + } + + public static void setInputNativeSSLKeystorePassword(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, authProvider); + } + + public static void setInputNativeSSLTruststorePassword(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, authProvider); + } + + public static void setInputNativeSSLCipherSuites(Configuration conf, String authProvider) + { + conf.set(INPUT_NATIVE_SSL_CIPHER_SUITES, authProvider); + } + + public static void setInputNativeReuseAddress(Configuration conf, String reuseAddress) + { + conf.set(INPUT_NATIVE_REUSE_ADDRESS, reuseAddress); + } + + public static void setInputNativeKeepAlive(Configuration conf, String keepAlive) + { + conf.set(INPUT_NATIVE_KEEP_ALIVE, keepAlive); + } + + public static void setInputNativePort(Configuration conf, String port) + { + conf.set(INPUT_NATIVE_PORT, port); + } + + private static PoolingOptions getReadPoolingOptions(Configuration conf) + { + Optional coreConnections = getInputCoreConnections(conf); + Optional maxConnections = getInputMaxConnections(conf); + Optional maxSimultaneousRequests = getInputMaxSimultReqPerConnections(conf); + Optional minSimultaneousRequests = getInputMinSimultReqPerConnections(conf); + + PoolingOptions poolingOptions = new PoolingOptions(); + + if (coreConnections.isPresent()) + poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnections.get()); + if (maxConnections.isPresent()) + poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnections.get()); + if (maxSimultaneousRequests.isPresent()) + poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, maxSimultaneousRequests.get()); + if (minSimultaneousRequests.isPresent()) + poolingOptions.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, minSimultaneousRequests.get()); + + poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 0) + .setMaxConnectionsPerHost(HostDistance.REMOTE, 0) + .setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0) + .setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0); + + return poolingOptions; + } + + private static QueryOptions getReadQueryOptions(Configuration conf) + { + String CL = ConfigHelper.getReadConsistencyLevel(conf); + Optional fetchSize = getInputPageRowSize(conf); + QueryOptions queryOptions = new QueryOptions(); + if (CL != null && !CL.isEmpty()) + queryOptions.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(CL)); + + if (fetchSize.isPresent()) + queryOptions.setFetchSize(fetchSize.get()); + return queryOptions; + } + + private static SocketOptions getReadSocketOptions(Configuration conf) + { + SocketOptions socketOptions = new SocketOptions(); + Optional connectTimeoutMillis = getInputNativeConnectionTimeout(conf); + Optional readTimeoutMillis = getInputNativeReadConnectionTimeout(conf); + Optional receiveBufferSize = getInputNativeReceiveBufferSize(conf); + Optional sendBufferSize = getInputNativeSendBufferSize(conf); + Optional soLinger = getInputNativeSolinger(conf); + Optional tcpNoDelay = getInputNativeTcpNodelay(conf); + Optional reuseAddress = getInputNativeReuseAddress(conf); + Optional keepAlive = getInputNativeKeepAlive(conf); + + if (connectTimeoutMillis.isPresent()) + socketOptions.setConnectTimeoutMillis(connectTimeoutMillis.get()); + if (readTimeoutMillis.isPresent()) + socketOptions.setReadTimeoutMillis(readTimeoutMillis.get()); + if (receiveBufferSize.isPresent()) + socketOptions.setReceiveBufferSize(receiveBufferSize.get()); + if (sendBufferSize.isPresent()) + socketOptions.setSendBufferSize(sendBufferSize.get()); + if (soLinger.isPresent()) + socketOptions.setSoLinger(soLinger.get()); + if (tcpNoDelay.isPresent()) + socketOptions.setTcpNoDelay(tcpNoDelay.get()); + if (reuseAddress.isPresent()) + socketOptions.setReuseAddress(reuseAddress.get()); + if (keepAlive.isPresent()) + socketOptions.setKeepAlive(keepAlive.get()); + + return socketOptions; + } + + private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost) + { + return new LoadBalancingPolicy() + { + private Host origHost; + private Set liveRemoteHosts = Sets.newHashSet(); + + @Override + public void onAdd(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = host; + } + + @Override + public void onDown(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = null; + liveRemoteHosts.remove(host); + } + + @Override + public void onRemove(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = null; + liveRemoteHosts.remove(host); + } + + @Override + public void onUp(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + origHost = host; + liveRemoteHosts.add(host); + } + + @Override + public HostDistance distance(Host host) + { + if (host.getAddress().getHostName().equals(stickHost)) + return HostDistance.LOCAL; + else + return HostDistance.REMOTE; + } + + @Override + public void init(Cluster cluster, Collection hosts) + { + for (Host host : hosts) + { + if (host.getAddress().getHostName().equals(stickHost)) + { + origHost = host; + break; + } + } + } + + @Override + public Iterator newQueryPlan(String loggedKeyspace, Statement statement) + { + if (origHost != null) + { + return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator()); + } + else + { + return liveRemoteHosts.iterator(); + } + } + }; + } + + private static Optional getAuthProvider(Configuration conf) + { + Optional authProvider = getInputNativeAuthProvider(conf); + if (!authProvider.isPresent()) + return Optional.absent(); + + return Optional.of(getClientAuthProvider(authProvider.get())); + } + + private static Optional getSSLOptions(Configuration conf) + { + Optional truststorePath = getInputNativeSSLTruststorePath(conf); + Optional keystorePath = getInputNativeSSLKeystorePath(conf); + Optional truststorePassword = getInputNativeSSLTruststorePassword(conf); + Optional keystorePassword = getInputNativeSSLKeystorePassword(conf); + Optional cipherSuites = getInputNativeSSLCipherSuites(conf); + + if (truststorePath.isPresent() && keystorePath.isPresent() && truststorePassword.isPresent() && keystorePassword.isPresent()) + { + SSLContext context; + try + { + context = getSSLContext(truststorePath.get(), truststorePassword.get(), keystorePath.get(), keystorePassword.get()); + } + catch (UnrecoverableKeyException | KeyManagementException | + NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e) + { + throw new RuntimeException(e); + } + String[] css = SSLOptions.DEFAULT_SSL_CIPHER_SUITES; + if (cipherSuites.isPresent()) + css = cipherSuites.get().split(","); + return Optional.of(new SSLOptions(context,css)); + } + return Optional.absent(); + } + + private static Optional getIntSetting(String parameter, Configuration conf) + { + String setting = conf.get(parameter); + if (setting == null) + return Optional.absent(); + return Optional.of(Integer.parseInt(setting)); + } + + private static Optional getBooleanSetting(String parameter, Configuration conf) + { + String setting = conf.get(parameter); + if (setting == null) + return Optional.absent(); + return Optional.of(Boolean.parseBoolean(setting)); + } + + private static Optional getStringSetting(String parameter, Configuration conf) + { + String setting = conf.get(parameter); + if (setting == null) + return Optional.absent(); + return Optional.of(setting); + } + + private static AuthProvider getClientAuthProvider(String factoryClassName) + { + try + { + return (AuthProvider) Class.forName(factoryClassName).newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to instantiate auth provider:" + factoryClassName, e); + } + } + + private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword) + throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException + { + FileInputStream tsf = null; + FileInputStream ksf = null; + SSLContext ctx = null; + try + { + tsf = new FileInputStream(truststorePath); + ksf = new FileInputStream(keystorePath); + ctx = SSLContext.getInstance("SSL"); + + KeyStore ts = KeyStore.getInstance("JKS"); + ts.load(tsf, truststorePassword.toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(ksf, keystorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, keystorePassword.toCharArray()); + + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + } + finally + { + FileUtils.closeQuietly(tsf); + FileUtils.closeQuietly(ksf); + } + return ctx; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java new file mode 100644 index 0000000..e1cdf32 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop.cql3; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +import com.datastax.driver.core.Row; + +/** + * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. + * + * At minimum, you need to set the KS and CF in your Hadoop job Configuration. + * The ConfigHelper class is provided to make this + * simple: + * ConfigHelper.setInputColumnFamily + * + * You can also configure the number of rows per InputSplit with + * ConfigHelper.setInputSplitSize. The default split size is 64k rows. + * + * the number of CQL rows per page + * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You + * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL + * query, so you need set it big enough to minimize the network overhead, and also + * not too big to avoid out of memory issue. + * + * other native protocol connection parameters in CqlConfigHelper + */ +public class CqlInputFormat extends AbstractColumnFamilyInputFormat +{ + public RecordReader getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) + throws IOException + { + TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) + { + @Override + public void progress() + { + reporter.progress(); + } + }; + + CqlRecordReader recordReader = new CqlRecordReader(); + recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); + return recordReader; + } + + @Override + public org.apache.hadoop.mapreduce.RecordReader createRecordReader( + org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException, + InterruptedException + { + return new CqlRecordReader(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index cee4b4b..b692280 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -117,7 +117,7 @@ public class CqlPagingRecordReader extends RecordReader, try { - pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf)); + pageRowSize = CqlConfigHelper.getInputPageRowSize(conf).get(); } catch (NumberFormatException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7cb9700/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java new file mode 100644 index 0000000..67f76cf --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop.cql3; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.utils.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +/** + * CqlRecordReader reads the rows return from the CQL query + * It uses CQL auto-paging. + *

+ * Return a Long as a local CQL row key starts from 0; + *

+ * Row as C* java driver CQL result set row + * 1) select clause must include partition key columns (to calculate the progress based on the actual CF row processed) + * 2) where clause must include token(partition_key1, ... , partition_keyn) > ? and + * token(partition_key1, ... , partition_keyn) <= ? (in the right order) + */ +public class CqlRecordReader extends RecordReader + implements org.apache.hadoop.mapred.RecordReader +{ + private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class); + + private ColumnFamilySplit split; + private RowIterator rowIterator; + + private Pair currentRow; + private int totalRowCount; // total number of rows to fetch + private String keyspace; + private String cfName; + private String cqlQuery; + private Cluster cluster; + private Session session; + private IPartitioner partitioner; + + // partition keys -- key aliases + private LinkedHashMap partitionBoundColumns = Maps.newLinkedHashMap(); + + public CqlRecordReader() + { + super(); + } + + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException + { + this.split = (ColumnFamilySplit) split; + Configuration conf = context.getConfiguration(); + totalRowCount = (this.split.getLength() < Long.MAX_VALUE) + ? (int) this.split.getLength() + : ConfigHelper.getInputSplitSize(conf); + cfName = ConfigHelper.getInputColumnFamily(conf); + keyspace = ConfigHelper.getInputKeyspace(conf); + cqlQuery = CqlConfigHelper.getInputCql(conf); + partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); + try + { + if (cluster != null) + return; + + // create connection using thrift + String[] locations = split.getLocations(); + Exception lastException = null; + for (String location : locations) + { + try + { + cluster = CqlConfigHelper.getInputCluster(location, conf); + break; + } + catch (Exception e) + { + lastException = e; + logger.warn("Failed to create authenticated client to {}", location); + } + } + if (cluster == null && lastException != null) + throw lastException; + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + if (cluster != null) + session = cluster.connect(keyspace); + rowIterator = new RowIterator(); + logger.debug("created {}", rowIterator); + } + + public void close() + { + if (session != null) + session.close(); + } + + public Long getCurrentKey() + { + return currentRow.left; + } + + public Row getCurrentValue() + { + return currentRow.right; + } + + public float getProgress() + { + if (!rowIterator.hasNext()) + return 1.0F; + + // the progress is likely to be reported slightly off the actual but close enough + float progress = ((float) rowIterator.totalRead / totalRowCount); + return progress > 1.0F ? 1.0F : progress; + } + + public boolean nextKeyValue() throws IOException + { + if (!rowIterator.hasNext()) + { + logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount); + return false; + } + + try + { + currentRow = rowIterator.next(); + } + catch (Exception e) + { + // throw it as IOException, so client can catch it and handle it at client side + IOException ioe = new IOException(e.getMessage()); + ioe.initCause(ioe.getCause()); + throw ioe; + } + return true; + } + + // Because the old Hadoop API wants us to write to the key and value + // and the new asks for them, we need to copy the output of the new API + // to the old. Thus, expect a small performance hit. + // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat + // and ColumnFamilyRecordReader don't support them, it should be fine for now. + public boolean next(Long key, Row value) throws IOException + { + if (nextKeyValue()) + { + ((WrappedRow)value).setRow(getCurrentValue()); + return true; + } + return false; + } + + public long getPos() throws IOException + { + return (long) rowIterator.totalRead; + } + + public Long createKey() + { + return new Long(0L); + } + + public Row createValue() + { + return new WrappedRow(); + } + + /** CQL row iterator + * Input cql query + * 1) select clause must include key columns (if we use partition key based row count) + * 2) where clause must include token(partition_key1 ... partition_keyn) > ? and + * token(partition_key1 ... partition_keyn) <= ? + */ + private class RowIterator extends AbstractIterator> + { + private long keyId = 0L; + protected int totalRead = 0; // total number of cf rows read + protected Iterator rows; + private Map previousRowKey = new HashMap(); // previous CF row key + + public RowIterator() + { + if (session == null) + throw new RuntimeException("Can't create connection session"); + + AbstractType type = partitioner.getTokenValidator(); + ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) ); + for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey()) + partitionBoundColumns.put(meta.getName(), Boolean.TRUE); + rows = rs.iterator(); + } + + protected Pair computeNext() + { + if (rows == null || !rows.hasNext()) + return endOfData(); + + Row row = rows.next(); + Map keyColumns = new HashMap(); + for (String column : partitionBoundColumns.keySet()) + keyColumns.put(column, row.getBytesUnsafe(column)); + + // increase total CF row read + if (previousRowKey.isEmpty() && !keyColumns.isEmpty()) + { + previousRowKey = keyColumns; + totalRead++; + } + else + { + for (String column : partitionBoundColumns.keySet()) + { + if (BytesType.bytesCompare(keyColumns.get(column), previousRowKey.get(column)) != 0) + { + previousRowKey = keyColumns; + totalRead++; + break; + } + } + } + keyId ++; + return Pair.create(keyId, row); + } + } + + private class WrappedRow implements Row + { + private Row row; + + public void setRow(Row row) + { + this.row = row; + } + + @Override + public ColumnDefinitions getColumnDefinitions() + { + return row.getColumnDefinitions(); + } + + @Override + public boolean isNull(int i) + { + return row.isNull(i); + } + + @Override + public boolean isNull(String name) + { + return row.isNull(name); + } + + @Override + public boolean getBool(int i) + { + return row.getBool(i); + } + + @Override + public boolean getBool(String name) + { + return row.getBool(name); + } + + @Override + public int getInt(int i) + { + return row.getInt(i); + } + + @Override + public int getInt(String name) + { + return row.getInt(name); + } + + @Override + public long getLong(int i) + { + return row.getLong(i); + } + + @Override + public long getLong(String name) + { + return row.getLong(name); + } + + @Override + public Date getDate(int i) + { + return row.getDate(i); + } + + @Override + public Date getDate(String name) + { + return row.getDate(name); + } + + @Override + public float getFloat(int i) + { + return row.getFloat(i); + } + + @Override + public float getFloat(String name) + { + return row.getFloat(name); + } + + @Override + public double getDouble(int i) + { + return row.getDouble(i); + } + + @Override + public double getDouble(String name) + { + return row.getDouble(name); + } + + @Override + public ByteBuffer getBytesUnsafe(int i) + { + return row.getBytesUnsafe(i); + } + + @Override + public ByteBuffer getBytesUnsafe(String name) + { + return row.getBytesUnsafe(name); + } + + @Override + public ByteBuffer getBytes(int i) + { + return row.getBytes(i); + } + + @Override + public ByteBuffer getBytes(String name) + { + return row.getBytes(name); + } + + @Override + public String getString(int i) + { + return row.getString(i); + } + + @Override + public String getString(String name) + { + return row.getString(name); + } + + @Override + public BigInteger getVarint(int i) + { + return row.getVarint(i); + } + + @Override + public BigInteger getVarint(String name) + { + return row.getVarint(name); + } + + @Override + public BigDecimal getDecimal(int i) + { + return row.getDecimal(i); + } + + @Override + public BigDecimal getDecimal(String name) + { + return row.getDecimal(name); + } + + @Override + public UUID getUUID(int i) + { + return row.getUUID(i); + } + + @Override + public UUID getUUID(String name) + { + return row.getUUID(name); + } + + @Override + public InetAddress getInet(int i) + { + return row.getInet(i); + } + + @Override + public InetAddress getInet(String name) + { + return row.getInet(name); + } + + @Override + public List getList(int i, Class elementsClass) + { + return row.getList(i, elementsClass); + } + + @Override + public List getList(String name, Class elementsClass) + { + return row.getList(name, elementsClass); + } + + @Override + public Set getSet(int i, Class elementsClass) + { + return row.getSet(i, elementsClass); + } + + @Override + public Set getSet(String name, Class elementsClass) + { + return row.getSet(name, elementsClass); + } + + @Override + public Map getMap(int i, Class keysClass, Class valuesClass) + { + return row.getMap(i, keysClass, valuesClass); + } + + @Override + public Map getMap(String name, Class keysClass, Class valuesClass) + { + return row.getMap(name, keysClass, valuesClass); + } + } +}