Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4DA5317321 for ; Thu, 1 Oct 2015 12:20:03 +0000 (UTC) Received: (qmail 13260 invoked by uid 500); 1 Oct 2015 12:20:03 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 13233 invoked by uid 500); 1 Oct 2015 12:20:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 13211 invoked by uid 99); 1 Oct 2015 12:20:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Oct 2015 12:20:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9AFEE051D; Thu, 1 Oct 2015 12:20:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: snazy@apache.org To: commits@cassandra.apache.org Date: Thu, 01 Oct 2015 12:20:02 -0000 Message-Id: <63474b6483c84beb921f090fee511397@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cassandra git commit: Bulk Loader API could not tolerate even node failure Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 40f4daa36 -> 1f99e7039 Bulk Loader API could not tolerate even node failure patch by Paulo Motta; reviewed by Carl Yeksigian for CASSANDRA-10347 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/31fc6d25 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/31fc6d25 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/31fc6d25 Branch: refs/heads/cassandra-3.0 Commit: 31fc6d25fd5edeb1f6ed671076e94be72f9b8dc7 Parents: c37562e Author: Paulo Motta Authored: Thu Oct 1 14:14:45 2015 +0200 Committer: Robert Stupp Committed: Thu Oct 1 14:14:45 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/AbstractBulkOutputFormat.java | 32 ++++++++++++++++++++ .../hadoop/AbstractBulkRecordWriter.java | 16 ++++++++-- 3 files changed, 47 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/31fc6d25/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0ad2b36..eec8161 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.10 + * Bulk Loader API could not tolerate even node failure (CASSANDRA-10347) * Avoid misleading pushed notifications when multiple nodes share an rpc_address (CASSANDRA-10052) * Fix dropping undroppable when message queue is full (CASSANDRA-10113) http://git-wip-us.apache.org/repos/asf/cassandra/blob/31fc6d25/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java index c0e91da..e893ba6 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.cassandra.hadoop; import java.io.IOException; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; @@ -70,4 +71,35 @@ public abstract class AbstractBulkOutputFormat extends OutputFormat public void setupTask(TaskAttemptContext taskContext) { } } + + /** + * Set the hosts to ignore as comma delimited values. + * Data will not be bulk loaded onto the ignored nodes. + * @param conf job configuration + * @param ignoreNodesCsv a comma delimited list of nodes to ignore + */ + public static void setIgnoreHosts(Configuration conf, String ignoreNodesCsv) + { + conf.set(AbstractBulkRecordWriter.IGNORE_HOSTS, ignoreNodesCsv); + } + + /** + * Set the hosts to ignore. Data will not be bulk loaded onto the ignored nodes. + * @param conf job configuration + * @param ignoreNodes the nodes to ignore + */ + public static void setIgnoreHosts(Configuration conf, String... ignoreNodes) + { + conf.setStrings(AbstractBulkRecordWriter.IGNORE_HOSTS, ignoreNodes); + } + + /** + * Get the hosts to ignore as a collection of strings + * @param conf job configuration + * @return the nodes to ignore as a collection of stirngs + */ + public static Collection getIgnoreHosts(Configuration conf) + { + return conf.getStringCollection(AbstractBulkRecordWriter.IGNORE_HOSTS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/31fc6d25/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java index 22255a6..f9322c7 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -60,12 +61,14 @@ implements org.apache.hadoop.mapred.RecordWriter public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; + public static final String IGNORE_HOSTS = "mapreduce.output.bulkoutputformat.ignorehosts"; private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class); protected final Configuration conf; protected final int maxFailures; - protected final int bufferSize; + protected final int bufferSize; + protected final Set ignores = new HashSet<>(); protected Closeable writer; protected SSTableLoader loader; protected Progressable progress; @@ -91,6 +94,15 @@ implements org.apache.hadoop.mapred.RecordWriter DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); + try + { + for (String hostToIgnore : AbstractBulkOutputFormat.getIgnoreHosts(conf)) + ignores.add(InetAddress.getByName(hostToIgnore)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(("Unknown host: " + e.getMessage())); + } } protected String getOutputLocation() throws IOException @@ -119,7 +131,7 @@ implements org.apache.hadoop.mapred.RecordWriter if (writer != null) { writer.close(); - Future future = loader.stream(); + Future future = loader.stream(ignores); while (true) { try