Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ECC7A112FC for ; Thu, 14 Aug 2014 21:40:55 +0000 (UTC) Received: (qmail 91727 invoked by uid 500); 14 Aug 2014 21:40:55 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 91624 invoked by uid 500); 14 Aug 2014 21:40:55 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 91374 invoked by uid 99); 14 Aug 2014 21:40:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Aug 2014 21:40:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 591AB9C3E37; Thu, 14 Aug 2014 21:40:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brandonwilliams@apache.org To: commits@cassandra.apache.org Date: Thu, 14 Aug 2014 21:40:56 -0000 Message-Id: In-Reply-To: <0b3d361a51384a75a478a7efd1c8cf00@git.apache.org> References: <0b3d361a51384a75a478a7efd1c8cf00@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: backport CASSANDRA-6927 to 2.0 backport CASSANDRA-6927 to 2.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44764c03 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44764c03 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44764c03 Branch: refs/heads/cassandra-2.1.0 Commit: 44764c03e892bfdb7294ad32d9ff703909186917 Parents: efde6ae Author: Brandon Williams Authored: Wed Aug 13 14:14:38 2014 -0500 Committer: Brandon Williams Committed: Wed Aug 13 14:14:38 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/AbstractBulkOutputFormat.java | 73 ++++++ .../hadoop/AbstractBulkRecordWriter.java | 251 ++++++++++++++++++ .../cassandra/hadoop/BulkOutputFormat.java | 49 +--- .../cassandra/hadoop/BulkRecordWriter.java | 259 ++----------------- .../hadoop/cql3/CqlBulkOutputFormat.java | 106 ++++++++ .../hadoop/cql3/CqlBulkRecordWriter.java | 199 ++++++++++++++ .../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +- .../cassandra/hadoop/cql3/CqlOutputFormat.java | 2 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 10 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 3 +- 11 files changed, 663 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3d31948..1ac22f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927) * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508) * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703) * Fix MS expiring map timeout for Paxos messages (CASSANDRA-7752) http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java new file mode 100644 index 0000000..c0e91da --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.*; + +public abstract class AbstractBulkOutputFormat extends OutputFormat + implements org.apache.hadoop.mapred.OutputFormat +{ + @Override + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(HadoopCompat.getConfiguration(context)); + } + + private void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); + } + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + public static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java new file mode 100644 index 0000000..22255a6 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.thrift.AuthenticationRequest; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.CfDef; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.TokenRange; +import org.apache.cassandra.utils.OutputHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractBulkRecordWriter extends RecordWriter +implements org.apache.hadoop.mapred.RecordWriter +{ + public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; + public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; + public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; + + private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class); + + protected final Configuration conf; + protected final int maxFailures; + protected final int bufferSize; + protected Closeable writer; + protected SSTableLoader loader; + protected Progressable progress; + protected TaskAttemptContext context; + + protected AbstractBulkRecordWriter(TaskAttemptContext context) + { + this(HadoopCompat.getConfiguration(context)); + this.context = context; + } + + protected AbstractBulkRecordWriter(Configuration conf, Progressable progress) + { + this(conf); + this.progress = progress; + } + + protected AbstractBulkRecordWriter(Configuration conf) + { + Config.setClientMode(true); + Config.setOutboundBindAny(true); + this.conf = conf; + DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); + maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); + bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); + } + + protected String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); + return dir; + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + + private void close() throws IOException + { + if (writer != null) + { + writer.close(); + Future future = loader.stream(); + while (true) + { + try + { + future.get(1000, TimeUnit.MILLISECONDS); + break; + } + catch (ExecutionException | TimeoutException te) + { + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + if (loader.getFailedHosts().size() > 0) + { + if (loader.getFailedHosts().size() > maxFailures) + throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); + else + logger.warn("Some hosts failed: {}", loader.getFailedHosts()); + } + } + } + + public static class ExternalClient extends SSTableLoader.Client + { + private final Map> knownCfs = new HashMap<>(); + private final Configuration conf; + private final String hostlist; + private final int rpcPort; + private final String username; + private final String password; + + public ExternalClient(Configuration conf) + { + super(); + this.conf = conf; + this.hostlist = ConfigHelper.getOutputInitialAddress(conf); + this.rpcPort = ConfigHelper.getOutputRpcPort(conf); + this.username = ConfigHelper.getOutputKeyspaceUserName(conf); + this.password = ConfigHelper.getOutputKeyspacePassword(conf); + } + + public void init(String keyspace) + { + Set hosts = new HashSet(); + String[] nodes = hostlist.split(","); + for (String node : nodes) + { + try + { + hosts.add(InetAddress.getByName(node)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + Iterator hostiter = hosts.iterator(); + while (hostiter.hasNext()) + { + try + { + InetAddress host = hostiter.next(); + Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort); + + // log in + client.set_keyspace(keyspace); + if (username != null) + { + Map creds = new HashMap(); + creds.put(IAuthenticator.USERNAME_KEY, username); + creds.put(IAuthenticator.PASSWORD_KEY, password); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + + List tokenRanges = client.describe_ring(keyspace); + List ksDefs = client.describe_keyspaces(); + + setPartitioner(client.describe_partitioner()); + Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); + + for (TokenRange tr : tokenRanges) + { + Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); + for (String ep : tr.endpoints) + { + addRangeForEndpoint(range, InetAddress.getByName(ep)); + } + } + + for (KsDef ksDef : ksDefs) + { + Map cfs = new HashMap<>(ksDef.cf_defs.size()); + for (CfDef cfDef : ksDef.cf_defs) + cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef)); + knownCfs.put(ksDef.name, cfs); + } + break; + } + catch (Exception e) + { + if (!hostiter.hasNext()) + throw new RuntimeException("Could not retrieve endpoint ranges: ", e); + } + } + } + + public CFMetaData getCFMetaData(String keyspace, String cfName) + { + Map cfs = knownCfs.get(keyspace); + return cfs != null ? cfs.get(cfName) : null; + } + } + + public static class NullOutputHandler implements OutputHandler + { + public void output(String msg) {} + public void debug(String msg) {} + public void warn(String msg) {} + public void warn(String msg, Throwable th) {} + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index c3d8e05..f5a5a8d 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -23,39 +23,10 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.cassandra.thrift.Mutation; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; -public class BulkOutputFormat extends OutputFormat> - implements org.apache.hadoop.mapred.OutputFormat> +public class BulkOutputFormat extends AbstractBulkOutputFormat> { - @Override - public void checkOutputSpecs(JobContext context) - { - checkOutputSpecs(HadoopCompat.getConfiguration(context)); - } - - private void checkOutputSpecs(Configuration conf) - { - if (ConfigHelper.getOutputKeyspace(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); - } - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException - { - return new NullOutputCommitter(); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException - { - checkOutputSpecs(job); - } - /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException @@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat> { return new BulkRecordWriter(context); } - - public static class NullOutputCommitter extends OutputCommitter - { - public void abortTask(TaskAttemptContext taskContext) { } - - public void cleanupJob(JobContext jobContext) { } - - public void commitTask(TaskAttemptContext taskContext) { } - - public boolean needsTaskCommit(TaskAttemptContext taskContext) - { - return false; - } - - public void setupJob(JobContext jobContext) { } - - public void setupTask(TaskAttemptContext taskContext) { } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 8bfc958..d67b856 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.nio.ByteBuffer; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.IAuthenticator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.OutputHandler; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.CounterColumn; +import org.apache.cassandra.thrift.Mutation; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.thrift.protocol.*; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.apache.hadoop.util.Progressable; -final class BulkRecordWriter extends RecordWriter> -implements org.apache.hadoop.mapred.RecordWriter> +public final class BulkRecordWriter extends AbstractBulkRecordWriter> { - private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; - private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; - private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; - private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; - private final Configuration conf; - private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class); - private SSTableSimpleUnsortedWriter writer; - private SSTableLoader loader; - private File outputdir; - private Progressable progress; - private TaskAttemptContext context; - private int maxFailures; - + private File outputDir; + + private enum CFType { NORMAL, @@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter> BulkRecordWriter(TaskAttemptContext context) { - this(HadoopCompat.getConfiguration(context)); - this.context = context; + super(context); } BulkRecordWriter(Configuration conf, Progressable progress) { - this(conf); - this.progress = progress; + super(conf, progress); } BulkRecordWriter(Configuration conf) { - Config.setClientMode(true); - Config.setOutboundBindAny(true); - this.conf = conf; - DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); - maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); - } - - private String getOutputLocation() throws IOException - { - String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); - if (dir == null) - throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); - return dir; + super(conf); } private void setTypes(Mutation mutation) @@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter> private void prepareWriter() throws IOException { - if (outputdir == null) + if (outputDir == null) { String keyspace = ConfigHelper.getOutputKeyspace(conf); //dir must be named by ks/cf for the loader - outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); - outputdir.mkdirs(); + outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); + outputDir.mkdirs(); } if (writer == null) { AbstractType subcomparator = null; - ExternalClient externalClient = null; - String username = ConfigHelper.getOutputKeyspaceUserName(conf); - String password = ConfigHelper.getOutputKeyspacePassword(conf); if (cfType == CFType.SUPER) subcomparator = BytesType.instance; - this.writer = new SSTableSimpleUnsortedWriter( - outputdir, + writer = new SSTableSimpleUnsortedWriter( + outputDir, ConfigHelper.getOutputPartitioner(conf), ConfigHelper.getOutputKeyspace(conf), ConfigHelper.getOutputColumnFamily(conf), @@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter> Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")), ConfigHelper.getOutputCompressionParamaters(conf)); - externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), - ConfigHelper.getOutputRpcPort(conf), - username, - password); - - this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler()); + this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler()); } } @@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter> { setTypes(value.get(0)); prepareWriter(); - writer.newRow(keybuff); + SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer; + ssWriter.newRow(keybuff); for (Mutation mut : value) { if (cfType == CFType.SUPER) { - writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); + ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); if (colType == ColType.COUNTER) for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns) - writer.addCounterColumn(column.name, column.value); + ssWriter.addCounterColumn(column.name, column.value); else { for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns) { if(column.ttl == 0) - writer.addColumn(column.name, column.value, column.timestamp); + ssWriter.addColumn(column.name, column.value, column.timestamp); else - writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000)); + ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000)); } } } else { if (colType == ColType.COUNTER) - writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value); + ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value); else { if(mut.getColumn_or_supercolumn().column.ttl == 0) - writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); else - writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000)); + ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000)); } } if (null != progress) @@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter> HadoopCompat.progress(context); } } - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException - { - close(); - } - - /** Fills the deprecated RecordWriter interface for streaming. */ - @Deprecated - public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException - { - close(); - } - - private void close() throws IOException - { - if (writer != null) - { - writer.close(); - Future future = loader.stream(); - while (true) - { - try - { - future.get(1000, TimeUnit.MILLISECONDS); - break; - } - catch (ExecutionException | TimeoutException te) - { - if (null != progress) - progress.progress(); - if (null != context) - HadoopCompat.progress(context); - } - catch (InterruptedException e) - { - throw new IOException(e); - } - } - if (loader.getFailedHosts().size() > 0) - { - if (loader.getFailedHosts().size() > maxFailures) - throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); - else - logger.warn("Some hosts failed: " + loader.getFailedHosts()); - } - } - } - - static class ExternalClient extends SSTableLoader.Client - { - private final Map> knownCfs = new HashMap<>(); - private final String hostlist; - private final int rpcPort; - private final String username; - private final String password; - - public ExternalClient(String hostlist, int port, String username, String password) - { - super(); - this.hostlist = hostlist; - this.rpcPort = port; - this.username = username; - this.password = password; - } - - public void init(String keyspace) - { - Set hosts = new HashSet(); - String[] nodes = hostlist.split(","); - for (String node : nodes) - { - try - { - hosts.add(InetAddress.getByName(node)); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - Iterator hostiter = hosts.iterator(); - while (hostiter.hasNext()) - { - try - { - InetAddress host = hostiter.next(); - Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); - - // log in - client.set_keyspace(keyspace); - if (username != null) - { - Map creds = new HashMap(); - creds.put(IAuthenticator.USERNAME_KEY, username); - creds.put(IAuthenticator.PASSWORD_KEY, password); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - - List tokenRanges = client.describe_ring(keyspace); - List ksDefs = client.describe_keyspaces(); - - setPartitioner(client.describe_partitioner()); - Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); - - for (TokenRange tr : tokenRanges) - { - Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); - for (String ep : tr.endpoints) - { - addRangeForEndpoint(range, InetAddress.getByName(ep)); - } - } - - for (KsDef ksDef : ksDefs) - { - Map cfs = new HashMap<>(ksDef.cf_defs.size()); - for (CfDef cfDef : ksDef.cf_defs) - cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef)); - knownCfs.put(ksDef.name, cfs); - } - break; - } - catch (Exception e) - { - if (!hostiter.hasNext()) - throw new RuntimeException("Could not retrieve endpoint ranges: ", e); - } - } - } - - public CFMetaData getCFMetaData(String keyspace, String cfName) - { - Map cfs = knownCfs.get(keyspace); - return cfs != null ? cfs.get(cfName) : null; - } - - private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException - { - TSocket socket = new TSocket(host, port); - TTransport trans = new TFramedTransport(socket); - trans.open(); - TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans); - return new Cassandra.Client(protocol); - } - } - - static class NullOutputHandler implements OutputHandler - { - public void output(String msg) {} - public void debug(String msg) {} - public void warn(String msg) {} - public void warn(String msg, Throwable th) {} - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java new file mode 100644 index 0000000..58e05b6 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop.cql3; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.cassandra.hadoop.AbstractBulkOutputFormat; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Progressable; + +/** + * The CqlBulkOutputFormat acts as a Hadoop-specific + * OutputFormat that allows reduce tasks to store keys (and corresponding + * bound variable values) as CQL rows (and respective columns) in a given + * ColumnFamily. + * + *

+ * As is the case with the {@link org.apache.cassandra.hadoop.CqlOutputFormat}, + * you need to set the prepared statement in your + * Hadoop job Configuration. The {@link CqlConfigHelper} class, through its + * {@link ConfigHelper#setOutputPreparedStatement} method, is provided to make this + * simple. + * you need to set the Keyspace. The {@link ConfigHelper} class, through its + * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this + * simple. + *

+ */ +public class CqlBulkOutputFormat extends AbstractBulkOutputFormat> +{ + + private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema."; + private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert."; + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public CqlBulkRecordWriter getRecordWriter(FileSystem filesystem, JobConf job, String name, Progressable progress) throws IOException + { + return new CqlBulkRecordWriter(job, progress); + } + + /** + * Get the {@link RecordWriter} for the given task. + * + * @param context + * the information about the current task. + * @return a {@link RecordWriter} to write the output for the job. + * @throws IOException + */ + public CqlBulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException + { + return new CqlBulkRecordWriter(context); + } + + public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema) + { + conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema); + } + + public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement) + { + conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement); + } + + public static String getColumnFamilySchema(Configuration conf, String columnFamily) + { + String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily); + if (schema == null) + { + throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema."); + } + return schema; + } + + public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily) + { + String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); + if (insert == null) + { + throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema."); + } + return insert; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java new file mode 100644 index 0000000..7a75bb4 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop.cql3; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.hadoop.AbstractBulkRecordWriter; +import org.apache.cassandra.hadoop.BulkRecordWriter; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.HadoopCompat; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Progressable; + +/** + * The CqlBulkRecordWriter maps the output <key, value> + * pairs to a Cassandra column family. In particular, it applies the binded variables + * in the value to the prepared statement, which it associates with the key, and in + * turn the responsible endpoint. + * + *

+ * Furthermore, this writer groups the cql queries by the endpoint responsible for + * the rows being affected. This allows the cql queries to be executed in parallel, + * directly to a responsible endpoint. + *

+ * + * @see CqlBulkOutputFormat + */ +public class CqlBulkRecordWriter extends AbstractBulkRecordWriter> +{ + private String keyspace; + private String columnFamily; + private String schema; + private String insertStatement; + private File outputDir; + + CqlBulkRecordWriter(TaskAttemptContext context) throws IOException + { + super(context); + setConfigs(); + } + + CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException + { + super(conf, progress); + setConfigs(); + } + + CqlBulkRecordWriter(Configuration conf) throws IOException + { + super(conf); + setConfigs(); + } + + private void setConfigs() throws IOException + { + // if anything is missing, exceptions will be thrown here, instead of on write() + keyspace = ConfigHelper.getOutputKeyspace(conf); + columnFamily = ConfigHelper.getOutputColumnFamily(conf); + schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily); + insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily); + outputDir = getColumnFamilyDirectory(); + } + + + private void prepareWriter() throws IOException + { + try + { + if (writer == null) + { + writer = CQLSSTableWriter.builder() + .forTable(schema) + .using(insertStatement) + .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) + .inDirectory(outputDir) + .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) + .build(); + } + if (loader == null) + { + ExternalClient externalClient = new ExternalClient(conf); + + externalClient.addKnownCfs(keyspace, schema); + + this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()); + } + } + catch (Exception e) + { + throw new IOException(e); + } + } + + /** + * The column values must correspond to the order in which + * they appear in the insert stored procedure. + * + * Key is not used, so it can be null or any object. + *

+ * + * @param key + * any object or null. + * @param values + * the values to write. + * @throws IOException + */ + @Override + public void write(Object key, List values) throws IOException + { + prepareWriter(); + try + { + ((CQLSSTableWriter) writer).rawAddRow(values); + + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); + } + catch (InvalidRequestException e) + { + throw new IOException("Error adding row with key: " + key, e); + } + } + + private File getColumnFamilyDirectory() throws IOException + { + File dir = new File(String.format("%s%s%s%s%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily)); + + if (!dir.exists() && !dir.mkdirs()) + { + throw new IOException("Failed to created output directory: " + dir); + } + + return dir; + } + + public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient + { + private Map> knownCqlCfs = new HashMap<>(); + + public ExternalClient(Configuration conf) + { + super(conf); + } + + public void addKnownCfs(String keyspace, String cql) + { + Map cfs = knownCqlCfs.get(keyspace); + + if (cfs == null) + { + cfs = new HashMap<>(); + knownCqlCfs.put(keyspace, cfs); + } + + CFMetaData metadata = CFMetaData.compile(cql, keyspace); + cfs.put(metadata.cfName, metadata); + } + + @Override + public CFMetaData getCFMetaData(String keyspace, String cfName) + { + CFMetaData metadata = super.getCFMetaData(keyspace, cfName); + if (metadata != null) + { + return metadata; + } + + Map cfs = knownCqlCfs.get(keyspace); + return cfs != null ? cfs.get(cfName) : null; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index b2c8fbf..e894996 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -89,7 +89,7 @@ public class CqlConfigHelper private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites"; private static final String OUTPUT_CQL = "cassandra.output.cql"; - + /** * Set the CQL columns for the input of this job. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java index 7c89bef..5845175 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java @@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*; /** * The ColumnFamilyOutputFormat acts as a Hadoop-specific * OutputFormat that allows reduce tasks to store keys (and corresponding - * binded variable values) as CQL rows (and respective columns) in a given + * bound variable values) as CQL rows (and respective columns) in a given * ColumnFamily. * *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index e337185..db87226 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.sstable; +import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -34,7 +35,7 @@ import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.HeapAllocator; import org.apache.cassandra.utils.Pair; -public abstract class AbstractSSTableSimpleWriter +public abstract class AbstractSSTableSimpleWriter implements Closeable { protected final File directory; protected final CFMetaData metadata; @@ -161,13 +162,6 @@ public abstract class AbstractSSTableSimpleWriter } /** - * Close this writer. - * This method should be called, otherwise the produced sstables are not - * guaranteed to be complete (and won't be in practice). - */ - public abstract void close() throws IOException; - - /** * Package protected for use by AbstractCQLSSTableWriter. * Not meant to be exposed publicly. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index a7ece70..61990ec 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.sstable; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair; * writer.close(); * */ -public class CQLSSTableWriter +public class CQLSSTableWriter implements Closeable { private final AbstractSSTableSimpleWriter writer; private final UpdateStatement insert;