cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [3/3] git commit: Configurable transport for CFRR/CFRW. Patch by Piotr Kołaczkowski, reviewed by brandonwilliams for CASSANDRA-4558
Date Mon, 20 Aug 2012 20:32:54 GMT
Configurable transport for CFRR/CFRW.
Patch by Piotr Kołaczkowski, reviewed by brandonwilliams for
CASSANDRA-4558


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7db46ef8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7db46ef8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7db46ef8

Branch: refs/heads/cassandra-1.1
Commit: 7db46ef80acc567a6ac3e4edcfcf39a6b22b73fa
Parents: f5619bb
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Mon Aug 20 15:27:53 2012 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Mon Aug 20 15:27:53 2012 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/ColumnFamilyInputFormat.java  |    2 +-
 .../cassandra/hadoop/ColumnFamilyOutputFormat.java |   12 ++-
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |    6 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   55 ++++++++++++---
 .../apache/cassandra/thrift/ITransportFactory.java |   36 ++++++++++
 .../cassandra/thrift/TFramedTransportFactory.java  |   37 ++++++++++
 6 files changed, 131 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 354903d..cb79b01 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -252,7 +252,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer,
SortedMap<B
 
             try
             {
-                Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getInputRpcPort(conf),
true);
+                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
                 client.set_keyspace(keyspace);
                 return client.describe_splits(cfName, range.start_token, range.end_token,
splitsize);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 668c4aa..e01ada5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,9 +36,10 @@ import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
+import javax.security.auth.login.LoginException;
+
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
@@ -149,11 +151,12 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
      * @throws AuthorizationException
      */
     public static Cassandra.Client createAuthenticatedClient(TSocket socket, Configuration
conf)
-    throws InvalidRequestException, TException, AuthenticationException, AuthorizationException
+            throws InvalidRequestException, TException, AuthenticationException, AuthorizationException,
LoginException
     {
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
+        logger.debug("Creating authenticated client for CF output format");
+        TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket);
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        socket.open();
         client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
         if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
         {
@@ -163,6 +166,7 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
             AuthenticationRequest authRequest = new AuthenticationRequest(creds);
             client.login(authRequest);
         }
+        logger.debug("Authenticated client for CF output format created successfully");
         return client;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index d35f142..fc90e5c 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.*;
+import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,6 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer,
IColumn>>
@@ -160,9 +160,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer,
SortedMap
             // create connection using thrift
             String location = getLocation();
             socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
-            TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
+            TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket);
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
             client = new Cassandra.Client(binaryProtocol);
-            socket.open();
 
             // log in
             client.set_keyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 87dd5e0..1646635 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -40,11 +40,12 @@ import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import javax.security.auth.login.LoginException;
+
 
 public class ConfigHelper
 {
@@ -73,6 +74,8 @@ public class ConfigHelper
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
+    private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
+    private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
 
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
 
@@ -462,7 +465,7 @@ public class ConfigHelper
         return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","),
ConfigHelper.getInputRpcPort(conf));
     }
 
-        public static Cassandra.Client getClientFromOutputAddressList(Configuration conf)
throws IOException
+    public static Cassandra.Client getClientFromOutputAddressList(Configuration conf) throws
IOException
     {
         return getClientFromAddressList(conf, ConfigHelper.getOutputInitialAddress(conf).split(","),
ConfigHelper.getOutputRpcPort(conf));
     }
@@ -475,7 +478,7 @@ public class ConfigHelper
         {
             try
             {
-                client = createConnection(address, port, true);
+                client = createConnection(conf, address, port);
                 break;
             }
             catch (IOException ioe)
@@ -495,19 +498,53 @@ public class ConfigHelper
         return client;
     }
 
-    public static Cassandra.Client createConnection(String host, Integer port, boolean framed)
+    public static Cassandra.Client createConnection(Configuration conf, String host, Integer
port)
             throws IOException
     {
-        TSocket socket = new TSocket(host, port);
-        TTransport trans = framed ? new TFramedTransport(socket) : socket;
         try
         {
-            trans.open();
+            TSocket socket = new TSocket(host, port);
+            TTransport transport = getInputTransportFactory(conf).openTransport(socket);
+            return new Cassandra.Client(new TBinaryProtocol(transport));
+        }
+        catch (LoginException e)
+        {
+            throw new IOException("Unable to login to server " + host + ":" + port, e);
         }
         catch (TTransportException e)
         {
-            throw new IOException("unable to connect to server", e);
+            throw new IOException("Unable to connect to server " + host + ":" + port, e);
+        }
+    }
+
+    public static ITransportFactory getInputTransportFactory(Configuration conf)
+    {
+        return getTransportFactory(conf.get(INPUT_TRANSPORT_FACTORY_CLASS, TFramedTransportFactory.class.getName()));
+    }
+
+    public static void setInputTransportFactoryClass(Configuration conf, String classname)
+    {
+        conf.set(INPUT_TRANSPORT_FACTORY_CLASS, classname);
+    }
+
+    public static ITransportFactory getOutputTransportFactory(Configuration conf)
+    {
+        return getTransportFactory(conf.get(OUTPUT_TRANSPORT_FACTORY_CLASS, TFramedTransportFactory.class.getName()));
+    }
+
+    public static void setOutputTransportFactoryClass(Configuration conf, String classname)
+    {
+        conf.set(OUTPUT_TRANSPORT_FACTORY_CLASS, classname);
+    }
+
+    private static ITransportFactory getTransportFactory(String factoryClassName) {
+        try
+        {
+            return (ITransportFactory) Class.forName(factoryClassName).newInstance();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName,
e);
         }
-        return new Cassandra.Client(new TBinaryProtocol(trans));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/thrift/ITransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
new file mode 100644
index 0000000..47cd034
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
@@ -0,0 +1,36 @@
+package org.apache.cassandra.thrift;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+
+
+public interface ITransportFactory
+{
+    TTransport openTransport(TSocket socket) throws LoginException, TTransportException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7db46ef8/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
new file mode 100644
index 0000000..09ae99e
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -0,0 +1,37 @@
+package org.apache.cassandra.thrift;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class TFramedTransportFactory implements ITransportFactory
+{
+    public TTransport openTransport(TSocket socket) throws TTransportException
+    {
+        TTransport transport = new TFramedTransport(socket);
+        transport.open();
+        return transport;
+    }
+}


Mime
View raw message