Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 69AFA17322 for ; Thu, 1 Oct 2015 12:20:03 +0000 (UTC) Received: (qmail 13391 invoked by uid 500); 1 Oct 2015 12:20:03 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 13302 invoked by uid 500); 1 Oct 2015 12:20:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 13221 invoked by uid 99); 1 Oct 2015 12:20:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Oct 2015 12:20:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EF05DE1536; Thu, 1 Oct 2015 12:20:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: snazy@apache.org To: commits@cassandra.apache.org Date: Thu, 01 Oct 2015 12:20:03 -0000 Message-Id: <39855c5e512b431496fdeea8cc2245e9@git.apache.org> In-Reply-To: <63474b6483c84beb921f090fee511397@git.apache.org> References: <63474b6483c84beb921f090fee511397@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2 Merge branch 'cassandra-2.1' into cassandra-2.2 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8123b3b0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8123b3b0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8123b3b0 Branch: refs/heads/cassandra-3.0 Commit: 8123b3b07dbeee3628d98447651724c731e1170b Parents: 25de92e 31fc6d2 Author: Robert Stupp Authored: Thu Oct 1 14:16:40 2015 +0200 Committer: Robert Stupp Committed: Thu Oct 1 14:16:40 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/cql3/CqlBulkOutputFormat.java | 32 ++++++++++++++++++++ .../hadoop/cql3/CqlBulkRecordWriter.java | 13 +++++++- 3 files changed, 45 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8123b3b0/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 45070b2,eec8161..9c70c74 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,5 +1,17 @@@ -2.1.10 +2.2.2 + * cqlsh prompt includes name of keyspace after failed `use` statement (CASSANDRA-10369) + * Configurable page size in cqlsh (CASSANDRA-9855) + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761) + * Cancel transaction for sstables we wont redistribute index summary + for (CASSANDRA-10270) + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239) + * Fix repair hang when snapshot failed (CASSANDRA-10057) + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks + (CASSANDRA-10199) +Merged from 2.1: + * Bulk Loader API could not tolerate even node failure (CASSANDRA-10347) * Avoid misleading pushed notifications when multiple nodes share an rpc_address (CASSANDRA-10052) * Fix dropping undroppable when message queue is full (CASSANDRA-10113) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8123b3b0/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java index 3899f8c,7fedb41..051447c --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java @@@ -20,10 -20,11 +20,11 @@@ package org.apache.cassandra.hadoop.cql import java.io.IOException; import java.nio.ByteBuffer; ++import java.util.Collection; import java.util.List; -import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.hadoop.AbstractBulkOutputFormat; import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; @@@ -79,99 -85,124 +80,130 @@@ public class CqlBulkOutputFormat extend { return new CqlBulkRecordWriter(context); } - - public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema) - { - conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema); - } - public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement) + @Override + public void checkOutputSpecs(JobContext context) { - conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement); + checkOutputSpecs(HadoopCompat.getConfiguration(context)); } - public static void setStoragePort(Configuration conf, int port) + private void checkOutputSpecs(Configuration conf) { - conf.set(OUTPUT_CQL_STORAGE_PORT, "" + port); + if (ConfigHelper.getOutputKeyspace(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace with setTable()"); + } } - public static void setSSLStoragePort(Configuration conf, int port) + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException { - conf.set(OUTPUT_CQL_SSL_STORAGE_PORT, "" + port); + checkOutputSpecs(job); } - public static void setInternodeEncryption(Configuration conf, String encrypt) + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - conf.set(INTERNODE_ENCRYPTION, encrypt); + return new NullOutputCommitter(); } - - public static void setServerKeystore(Configuration conf, String keystore) + + public static void setTableSchema(Configuration conf, String columnFamily, String schema) { - conf.set(SERVER_KEYSTORE, keystore); + conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema); } - public static void setServerKeystorePassword(Configuration conf, String keystorePass) + public static void setTableInsertStatement(Configuration conf, String columnFamily, String insertStatement) { - conf.set(SERVER_KEYSTORE_PASSWORD, keystorePass); + conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement); } - - public static void setServerTruststore(Configuration conf, String truststore) + + public static String getTableSchema(Configuration conf, String columnFamily) { - conf.set(SERVER_TRUSTSTORE, truststore); + String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily); + if (schema == null) + { + throw new UnsupportedOperationException("You must set the Table schema using setTableSchema."); + } + return schema; } - - public static void setServerTruststorePassword(Configuration conf, String truststorePass) + + public static String getTableInsertStatement(Configuration conf, String columnFamily) { - conf.set(SERVER_TRUSTSTORE_PASSWORD, truststorePass); + String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); + if (insert == null) + { + throw new UnsupportedOperationException("You must set the Table insert statement using setTableSchema."); + } + return insert; } - - public static void setServerCipherSuites(Configuration conf, String cipherSuites) + + public static void setDeleteSourceOnSuccess(Configuration conf, boolean deleteSrc) { - conf.set(SERVER_CIPHER_SUITES, cipherSuites); + conf.setBoolean(DELETE_SOURCE, deleteSrc); } - - public static int getStoragePort(Configuration conf) + + public static boolean getDeleteSourceOnSuccess(Configuration conf) { - return conf.getInt(OUTPUT_CQL_STORAGE_PORT, DEFAULT_STORAGE_PORT); + return conf.getBoolean(DELETE_SOURCE, false); } - - public static int getSSLStoragePort(Configuration conf) + + public static void setTableAlias(Configuration conf, String alias, String columnFamily) { - return conf.getInt(OUTPUT_CQL_SSL_STORAGE_PORT, DEFAULT_SSL_STORAGE_PORT); + conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily); } - - public static String getInternodeEncryption(Configuration conf) + + public static String getTableForAlias(Configuration conf, String alias) { - return conf.get(INTERNODE_ENCRYPTION, EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none.name()); + return conf.get(TABLE_ALIAS_PREFIX + alias); } - public static String getServerKeystore(Configuration conf) ++ /** ++ * Set the hosts to ignore as comma delimited values. ++ * Data will not be bulk loaded onto the ignored nodes. ++ * @param conf job configuration ++ * @param ignoreNodesCsv a comma delimited list of nodes to ignore ++ */ ++ public static void setIgnoreHosts(Configuration conf, String ignoreNodesCsv) + { - return conf.get(SERVER_KEYSTORE); ++ conf.set(CqlBulkRecordWriter.IGNORE_HOSTS, ignoreNodesCsv); + } + - public static String getServerTruststore(Configuration conf) ++ /** ++ * Set the hosts to ignore. Data will not be bulk loaded onto the ignored nodes. ++ * @param conf job configuration ++ * @param ignoreNodes the nodes to ignore ++ */ ++ public static void setIgnoreHosts(Configuration conf, String... ignoreNodes) + { - return conf.get(SERVER_TRUSTSTORE); ++ conf.setStrings(CqlBulkRecordWriter.IGNORE_HOSTS, ignoreNodes); + } + - public static String getServerKeystorePassword(Configuration conf) ++ /** ++ * Get the hosts to ignore as a collection of strings ++ * @param conf job configuration ++ * @return the nodes to ignore as a collection of stirngs ++ */ ++ public static Collection getIgnoreHosts(Configuration conf) + { - return conf.get(SERVER_KEYSTORE_PASSWORD); ++ return conf.getStringCollection(CqlBulkRecordWriter.IGNORE_HOSTS); + } + - public static String getServerTruststorePassword(Configuration conf) + public static class NullOutputCommitter extends OutputCommitter { - return conf.get(SERVER_TRUSTSTORE_PASSWORD); - } + public void abortTask(TaskAttemptContext taskContext) { } - public static String getServerCipherSuites(Configuration conf) - { - return conf.get(SERVER_CIPHER_SUITES); - } + public void cleanupJob(JobContext jobContext) { } - public static String getColumnFamilySchema(Configuration conf, String columnFamily) - { - String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily); - if (schema == null) - { - throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema."); - } - return schema; - } - - public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily) - { - String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); - if (insert == null) + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) { - throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema."); + return false; } - return insert; - } - - public static void setDeleteSourceOnSuccess(Configuration conf, boolean deleteSrc) - { - conf.setBoolean(DELETE_SOURCE, deleteSrc); - } - - public static boolean getDeleteSourceOnSuccess(Configuration conf) - { - return conf.getBoolean(DELETE_SOURCE, false); + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8123b3b0/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index 9e6e23b,ced8aa9..d064e27 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@@ -61,26 -59,10 +61,28 @@@ import org.apache.hadoop.util.Progressa * * @see CqlBulkOutputFormat */ -public class CqlBulkRecordWriter extends AbstractBulkRecordWriter> +public class CqlBulkRecordWriter extends RecordWriter> + implements org.apache.hadoop.mapred.RecordWriter> { + public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; + public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; + public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; ++ public final static String IGNORE_HOSTS = "mapreduce.output.bulkoutputformat.ignorehosts"; + + private final Logger logger = LoggerFactory.getLogger(CqlBulkRecordWriter.class); + + protected final Configuration conf; + protected final int maxFailures; + protected final int bufferSize; + protected Closeable writer; + protected SSTableLoader loader; + protected Progressable progress; + protected TaskAttemptContext context; ++ protected final Set ignores = new HashSet<>(); + private String keyspace; - private String columnFamily; + private String table; private String schema; private String insertStatement; private File outputDir; @@@ -115,64 -90,45 +117,73 @@@ { // if anything is missing, exceptions will be thrown here, instead of on write() keyspace = ConfigHelper.getOutputKeyspace(conf); - columnFamily = ConfigHelper.getOutputColumnFamily(conf); - schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily); - insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily); - outputDir = getColumnFamilyDirectory(); + table = ConfigHelper.getOutputColumnFamily(conf); + + // check if table is aliased + String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table); + if (aliasedCf != null) + table = aliasedCf; + + schema = CqlBulkOutputFormat.getTableSchema(conf, table); + insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table); + outputDir = getTableDirectory(); deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf); + try + { + partitioner = ConfigHelper.getInputPartitioner(conf); + } + catch (Exception e) + { + partitioner = Murmur3Partitioner.instance; + } ++ try ++ { ++ for (String hostToIgnore : CqlBulkOutputFormat.getIgnoreHosts(conf)) ++ ignores.add(InetAddress.getByName(hostToIgnore)); ++ } ++ catch (UnknownHostException e) ++ { ++ throw new RuntimeException(("Unknown host: " + e.getMessage())); ++ } + } + + protected String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); + return dir; } - private void prepareWriter() throws IOException { - try + if (writer == null) { - if (writer == null) - { - writer = CQLSSTableWriter.builder() - .forTable(schema) - .using(insertStatement) - .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) - .inDirectory(outputDir) - .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) - .build(); - } - if (loader == null) - { - BulkLoader.ExternalClient externalClient = getExternalClient(conf); - this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) { - @Override - public void onSuccess(StreamState finalState) - { - if (deleteSrc) - FileUtils.deleteRecursive(outputDir); - } - }; - } + writer = CQLSSTableWriter.builder() + .forTable(schema) + .using(insertStatement) + .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) + .inDirectory(outputDir) + .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) + .withPartitioner(partitioner) + .build(); } - catch (Exception e) + + if (loader == null) { - throw new IOException(e); - } + ExternalClient externalClient = new ExternalClient(conf); + externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace)); + + loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) + { + @Override + public void onSuccess(StreamState finalState) + { + if (deleteSrc) + FileUtils.deleteRecursive(outputDir); + } + }; + } } /** @@@ -220,82 -175,52 +231,82 @@@ return dir; } - private BulkLoader.ExternalClient getExternalClient(Configuration conf) + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + + private void close() throws IOException { - Set hosts = new HashSet(); - String outputAddress = ConfigHelper.getOutputInitialAddress(conf); - if (outputAddress == null) outputAddress = "localhost"; - String[] nodes = outputAddress.split(","); - for (String node : nodes) + if (writer != null) { - try + writer.close(); - Future future = loader.stream(); ++ Future future = loader.stream(ignores); + while (true) { - hosts.add(InetAddress.getByName(node)); + try + { + future.get(1000, TimeUnit.MILLISECONDS); + break; + } + catch (ExecutionException | TimeoutException te) + { + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); + } + catch (InterruptedException e) + { + throw new IOException(e); + } } - catch (UnknownHostException e) + if (loader.getFailedHosts().size() > 0) { - throw new RuntimeException(e); + if (loader.getFailedHosts().size() > maxFailures) + throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); + else + logger.warn("Some hosts failed: {}", loader.getFailedHosts()); } } - int rpcPort = ConfigHelper.getOutputRpcPort(conf); - String username = ConfigHelper.getOutputKeyspaceUserName(conf); - String password = ConfigHelper.getOutputKeyspacePassword(conf); - ITransportFactory transportFactory = ConfigHelper.getClientTransportFactory(conf); - return new BulkLoader.ExternalClient(hosts, - rpcPort, - username, - password, - transportFactory, - CqlBulkOutputFormat.getStoragePort(conf), - CqlBulkOutputFormat.getSSLStoragePort(conf), - getServerEncryptOpt(conf)); } - - private ServerEncryptionOptions getServerEncryptOpt(Configuration conf) + + public static class ExternalClient extends NativeSSTableLoaderClient { - ServerEncryptionOptions encryptOpt = new ServerEncryptionOptions(); - String internodeEncrypt = CqlBulkOutputFormat.getInternodeEncryption(conf); - if (StringUtils.isEmpty(internodeEncrypt)) - return encryptOpt; - - encryptOpt.internode_encryption = EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.valueOf(internodeEncrypt); - encryptOpt.keystore = CqlBulkOutputFormat.getServerKeystore(conf); - encryptOpt.truststore = CqlBulkOutputFormat.getServerTruststore(conf); - encryptOpt.keystore_password = CqlBulkOutputFormat.getServerKeystorePassword(conf); - encryptOpt.truststore_password = CqlBulkOutputFormat.getServerTruststorePassword(conf); - String cipherSuites = CqlBulkOutputFormat.getServerCipherSuites(conf); - if (!StringUtils.isEmpty(cipherSuites)) - encryptOpt.cipher_suites = cipherSuites.replace(" ", "").split(","); - return encryptOpt; + public ExternalClient(Configuration conf) + { + super(resolveHostAddresses(conf), + CqlConfigHelper.getOutputNativePort(conf), + ConfigHelper.getOutputKeyspaceUserName(conf), + ConfigHelper.getOutputKeyspacePassword(conf), + CqlConfigHelper.getSSLOptions(conf).orNull()); + } + + private static Collection resolveHostAddresses(Configuration conf) + { + Set addresses = new HashSet<>(); + + for (String host : ConfigHelper.getOutputInitialAddress(conf).split(",")) + { + try + { + addresses.add(InetAddress.getByName(host)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + return addresses; + } } }