cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/6] git commit: Make CPRR more robust to failures. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-6302
Date Tue, 05 Nov 2013 21:29:31 GMT
Make CPRR more robust to failures.
Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-6302


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e7d7285
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e7d7285
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e7d7285

Branch: refs/heads/cassandra-2.0
Commit: 8e7d7285cdeac4f2527c933280d595bbddd26935
Parents: cd7b05f
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Tue Nov 5 15:26:07 2013 -0600
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Tue Nov 5 15:26:07 2013 -0600

----------------------------------------------------------------------
 .../hadoop/cql3/CqlPagingRecordReader.java      | 32 ++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e7d7285/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
index d1a089f..b6e793c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java
@@ -135,10 +135,24 @@ public class CqlPagingRecordReader extends RecordReader<Map<String,
ByteBuffer>,
                 return;
 
             // create connection using thrift
-            String location = getLocation();
-
-            int port = ConfigHelper.getInputRpcPort(conf);
-            client = CqlPagingInputFormat.createAuthenticatedClient(location, port, conf);
+            String[] locations = split.getLocations();
+            Exception lastException = null;
+            for (String location : locations)
+            {
+                int port = ConfigHelper.getInputRpcPort(conf);
+                try
+                {
+                    client = CqlPagingInputFormat.createAuthenticatedClient(location, port,
conf);
+                    break;
+                }
+                catch (Exception e)
+                {
+                    lastException = e;
+                    logger.warn("Failed to create authenticated client to {}:{}", location
, port);
+                }
+            }
+            if (client == null && lastException != null)
+                throw lastException;
 
             // retrieve partition keys and cluster keys from system.schema_columnfamilies
table
             retrieveKeys();
@@ -210,7 +224,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String,
ByteBuffer>,
 
     // we don't use endpointsnitch since we are trying to support hadoop nodes that are
     // not necessarily on Cassandra machines, too.  This should be adequate for single-DC
clusters, at least.
-    private String getLocation()
+    private String[] getLocations()
     {
         Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses();
 
@@ -229,11 +243,11 @@ public class CqlPagingRecordReader extends RecordReader<Map<String,
ByteBuffer>,
                 }
                 if (address.equals(locationAddress))
                 {
-                    return location;
+                    return new String[] { location };
                 }
             }
         }
-        return split.getLocations()[0];
+        return split.getLocations();
     }
 
     // Because the old Hadoop API wants us to write to the key and value
@@ -434,8 +448,8 @@ public class CqlPagingRecordReader extends RecordReader<Map<String,
ByteBuffer>,
 
                 columns = withoutKeyColumns(columns);
                 columns = (clusterKey == null || "".equals(clusterKey))
-                        ? partitionKey + "," + columns
-                        : partitionKey + "," + clusterKey + "," + columns;
+                        ? partitionKey + (columns != null ? ("," + columns) : "")
+                        : partitionKey + "," + clusterKey + (columns != null ? ("," + columns)
: "");
             }
 
             String whereStr = userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses;


Mime
View raw message