cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/3] cassandra git commit: More fixes to connection error handling in CqlRecordWriter
Date Fri, 22 May 2015 17:51:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 5ab14968e -> 0d24b1a80
  refs/heads/trunk 491f7dc27 -> 8d1a50e09


More fixes to connection error handling in CqlRecordWriter

Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9442


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d24b1a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d24b1a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d24b1a8

Branch: refs/heads/cassandra-2.2
Commit: 0d24b1a802f67641b534cf2a407342f6129862ef
Parents: 5ab1496
Author: Philip Thompson <ptnapoleon@gmail.com>
Authored: Wed May 20 13:56:31 2015 -0400
Committer: Sam Tunnicliffe <sam@beobal.com>
Committed: Fri May 22 18:39:59 2015 +0100

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 57 +++++++++++---------
 1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d24b1a8/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index c507197..91753a2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -26,30 +26,18 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import com.datastax.driver.core.exceptions.AuthenticationException;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TokenRange;
-import org.apache.cassandra.db.marshal.AbstractType;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.*;
 import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -214,7 +202,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
         TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
 
         // get the client for the given range, or create a new one
-	final InetAddress address = ringCache.getEndpoints(range).get(0);
+        final InetAddress address = ringCache.getEndpoints(range).get(0);
         RangeClient client = clients.get(address);
         if (client == null)
         {
@@ -325,8 +313,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
                         //There are too many ways for the Thread.interrupted() state to be
cleared, so
                         //we can't rely on that here. Until the java driver gives us a better
way of knowing
                         //that this exception came from an InterruptedException, this is
the best solution.
-                        if (e instanceof DriverException && e.getMessage().contains("Connection
thread interrupted")) {
-                            lastException = new IOException(e);
+                        if (canRetryDriverConnection(e))
+                        {
                             iter.previous();
                         }
                         closeInternal();
@@ -417,7 +405,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
                 throw lastException;
         }
 
-
         protected void closeInternal()
         {
             if (client != null)
@@ -425,6 +412,24 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
                 client.close();;
             }
         }
+
+        private boolean canRetryDriverConnection(Exception e)
+        {
+            if (e instanceof DriverException && e.getMessage().contains("Connection
thread interrupted"))
+                return true;
+            if (e instanceof NoHostAvailableException)
+            {
+                if (((NoHostAvailableException) e).getErrors().values().size() == 1)
+                {
+                    Throwable cause = ((NoHostAvailableException) e).getErrors().values().iterator().next();
+                    if (cause != null && cause.getCause() instanceof java.nio.channels.ClosedByInterruptException)
+                    {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
     }
 
     private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)


Mime
View raw message