cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [1/3] cassandra git commit: Bulk Loader API could not tolerate even node failure
Date Thu, 01 Oct 2015 12:20:02 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 40f4daa36 -> 1f99e7039


Bulk Loader API could not tolerate even node failure

patch by Paulo Motta; reviewed by Carl Yeksigian for CASSANDRA-10347


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/31fc6d25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/31fc6d25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/31fc6d25

Branch: refs/heads/cassandra-3.0
Commit: 31fc6d25fd5edeb1f6ed671076e94be72f9b8dc7
Parents: c37562e
Author: Paulo Motta <pauloricardomg@gmail.com>
Authored: Thu Oct 1 14:14:45 2015 +0200
Committer: Robert Stupp <snazy@snazy.de>
Committed: Thu Oct 1 14:14:45 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop/AbstractBulkOutputFormat.java        | 32 ++++++++++++++++++++
 .../hadoop/AbstractBulkRecordWriter.java        | 16 ++++++++--
 3 files changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/31fc6d25/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ad2b36..eec8161 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.10
+ * Bulk Loader API could not tolerate even node failure (CASSANDRA-10347)
  * Avoid misleading pushed notifications when multiple nodes
    share an rpc_address (CASSANDRA-10052)
  * Fix dropping undroppable when message queue is full (CASSANDRA-10113)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31fc6d25/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
index c0e91da..e893ba6 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.hadoop;
 
 
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
@@ -70,4 +71,35 @@ public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K,
V>
 
         public void setupTask(TaskAttemptContext taskContext) { }
     }
+
+    /**
+     * Set the hosts to ignore as comma delimited values.
+     * Data will not be bulk loaded onto the ignored nodes.
+     * @param conf job configuration
+     * @param ignoreNodesCsv a comma delimited list of nodes to ignore
+     */
+    public static void setIgnoreHosts(Configuration conf, String ignoreNodesCsv)
+    {
+        conf.set(AbstractBulkRecordWriter.IGNORE_HOSTS, ignoreNodesCsv);
+    }
+
+    /**
+     * Set the hosts to ignore. Data will not be bulk loaded onto the ignored nodes.
+     * @param conf job configuration
+     * @param ignoreNodes the nodes to ignore
+     */
+    public static void setIgnoreHosts(Configuration conf, String... ignoreNodes)
+    {
+        conf.setStrings(AbstractBulkRecordWriter.IGNORE_HOSTS, ignoreNodes);
+    }
+
+    /**
+     * Get the hosts to ignore as a collection of strings
+     * @param conf job configuration
+     * @return the nodes to ignore as a collection of stirngs
+     */
+    public static Collection<String> getIgnoreHosts(Configuration conf)
+    {
+        return conf.getStringCollection(AbstractBulkRecordWriter.IGNORE_HOSTS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31fc6d25/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
index 22255a6..f9322c7 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -60,12 +61,14 @@ implements org.apache.hadoop.mapred.RecordWriter<K, V>
     public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
     public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
     public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+    public static final String IGNORE_HOSTS = "mapreduce.output.bulkoutputformat.ignorehosts";
     
     private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
     
     protected final Configuration conf;
     protected final int maxFailures;
-    protected final int bufferSize; 
+    protected final int bufferSize;
+    protected final Set<InetAddress> ignores = new HashSet<>();
     protected Closeable writer;
     protected SSTableLoader loader;
     protected Progressable progress;
@@ -91,6 +94,15 @@ implements org.apache.hadoop.mapred.RecordWriter<K, V>
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS,
"0")));
         maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
         bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+        try
+        {
+            for (String hostToIgnore : AbstractBulkOutputFormat.getIgnoreHosts(conf))
+                ignores.add(InetAddress.getByName(hostToIgnore));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(("Unknown host: " + e.getMessage()));
+        }
     }
 
     protected String getOutputLocation() throws IOException
@@ -119,7 +131,7 @@ implements org.apache.hadoop.mapred.RecordWriter<K, V>
         if (writer != null)
         {
             writer.close();
-            Future<StreamState> future = loader.stream();
+            Future<StreamState> future = loader.stream(ignores);
             while (true)
             {
                 try


Mime
View raw message