cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/3] cassandra git commit: Fix loading set types in pig with the 2.1 client driver.
Date Tue, 13 Jan 2015 16:27:30 GMT
Fix loading set types in pig with the 2.1 client driver.

Patch by Artem Aliev, reviewed by brandonwilliams for CASSANDRA-8577


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1cb426b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1cb426b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1cb426b9

Branch: refs/heads/trunk
Commit: 1cb426b9831b42b5f368eac51a6e3bebdb1bd62a
Parents: 0757dc7
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Tue Jan 13 10:23:28 2015 -0600
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Tue Jan 13 10:23:28 2015 -0600

----------------------------------------------------------------------
 .../apache/cassandra/hadoop/cql3/CqlConfigHelper.java   | 11 +++++++++++
 .../apache/cassandra/hadoop/cql3/CqlRecordReader.java   | 12 ++++++++++++
 .../cassandra/hadoop/pig/AbstractCassandraStorage.java  |  5 ++++-
 .../apache/cassandra/hadoop/pig/CqlNativeStorage.java   |  4 ++++
 4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 2be811f..7d65663 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -81,6 +81,8 @@ public class CqlConfigHelper
     private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
     private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
 
+    private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
+
     private static final String OUTPUT_CQL = "cassandra.output.cql";
     
     /**
@@ -279,6 +281,10 @@ public class CqlConfigHelper
         return conf.get(OUTPUT_CQL);
     }
 
+    private static Optional<Integer> getProtocolVersion(Configuration conf) {
+        return getIntSetting(INPUT_NATIVE_PROTOCOL_VERSION, conf);
+    }
+
     public static Cluster getInputCluster(String host, Configuration conf)
     {
         // this method has been left for backward compatibility
@@ -290,6 +296,7 @@ public class CqlConfigHelper
         int port = getInputNativePort(conf);
         Optional<AuthProvider> authProvider = getAuthProvider(conf);
         Optional<SSLOptions> sslOptions = getSSLOptions(conf);
+        Optional<Integer> protocolVersion = getProtocolVersion(conf);
         LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
         SocketOptions socketOptions = getReadSocketOptions(conf);
         QueryOptions queryOptions = getReadQueryOptions(conf);
@@ -305,6 +312,9 @@ public class CqlConfigHelper
         if (sslOptions.isPresent())
             builder.withSSL(sslOptions.get());
 
+        if (protocolVersion.isPresent()) {
+            builder.withProtocolVersion(protocolVersion.get());
+        }
         builder.withLoadBalancingPolicy(loadBalancingPolicy)
                .withSocketOptions(socketOptions)
                .withQueryOptions(queryOptions)
@@ -313,6 +323,7 @@ public class CqlConfigHelper
         return builder.build();
     }
 
+
     public static void setInputCoreConnections(Configuration conf, String connections)
     {
         conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9c1118b..6a1f5bf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -89,6 +89,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
+    protected int nativeProtocolVersion = 1;
 
     public CqlRecordReader()
     {
@@ -129,6 +130,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         if (session == null)
           throw new RuntimeException("Can't create connection session");
 
+        //get negotiated serialization protocol
+        nativeProtocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
+
         // If the user provides a CQL query then we will use it without validation
         // otherwise we will fall back to building a query using the:
         //   inputColumns
@@ -230,6 +234,14 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         return new WrappedRow();
     }
 
+    /**
+     * Return native version protocol of the cluster connection
+     * @return serialization protocol version.
+     */
+    public int getNativeProtocolVersion() {
+        return nativeProtocolVersion;
+    }
+
     /** CQL row iterator 
      *  Input cql query  
      *  1) select clause must include key columns (if we use partition key based row count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 361baa4..035f99a 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface,
LoadMetadata
 {
+
     protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR
};
 
     // system environment variables that can be set to configure connection info:
@@ -101,6 +102,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
     protected boolean usePartitionFilter = false;
     protected String initHostAddress;
     protected String rpcPort;
+    protected int nativeProtocolVersion = 1;
+
 
     public AbstractCassandraStorage()
     {
@@ -793,7 +796,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
         {
             // For CollectionType, the compose() method assumes the v3 protocol format of
collection, which
             // is not correct here since we query using the CQL-over-thrift interface which
use the pre-v3 format
-            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value,
1);
+            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value,
nativeProtocolVersion);
         }
 
         return validator.compose(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 3c59a1c..f0bb8f9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -78,6 +79,9 @@ public class CqlNativeStorage extends CqlStorage
     public void prepareToRead(RecordReader reader, PigSplit split)
     {
         this.reader = reader;
+        if (reader instanceof CqlRecordReader) {
+            nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion();
+        }
     }
 
     /** get next row */


Mime
View raw message