cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/3] git commit: Add CqlOutputFormat patch by Paul Pak; reviewed by Piotr Kołaczkowski for CASSANDRA-6927
Date Wed, 13 Aug 2014 16:57:57 GMT
Add CqlOutputFormat
patch by Paul Pak; reviewed by Piotr Kołaczkowski for CASSANDRA-6927


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88ad4f45
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88ad4f45
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88ad4f45

Branch: refs/heads/trunk
Commit: 88ad4f4514765c62351ea02553769047a6c1e24c
Parents: 6a7235e
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Wed Aug 13 11:57:43 2014 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Wed Aug 13 11:57:43 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop/AbstractBulkOutputFormat.java        |  73 ++++++
 .../hadoop/AbstractBulkRecordWriter.java        | 251 ++++++++++++++++++
 .../cassandra/hadoop/BulkOutputFormat.java      |  49 +---
 .../cassandra/hadoop/BulkRecordWriter.java      | 259 ++-----------------
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlOutputFormat.java  |   2 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  10 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |   3 +-
 9 files changed, 358 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69c4adc..de93018 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
  * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
  * SSTableExport uses correct validator to create string representation of partition
    keys (CASSANDRA-7498)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
new file mode 100644
index 0000000..c0e91da
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
+    implements org.apache.hadoop.mapred.OutputFormat<K, V>
+{
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+        }
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf
job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
new file mode 100644
index 0000000..22255a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
+implements org.apache.hadoop.mapred.RecordWriter<K, V>
+{
+    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+    
+    private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
+    
+    protected final Configuration conf;
+    protected final int maxFailures;
+    protected final int bufferSize; 
+    protected Closeable writer;
+    protected SSTableLoader loader;
+    protected Progressable progress;
+    protected TaskAttemptContext context;
+    
+    protected AbstractBulkRecordWriter(TaskAttemptContext context)
+    {
+        this(HadoopCompat.getConfiguration(context));
+        this.context = context;
+    }
+
+    protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
+    {
+        this(conf);
+        this.progress = progress;
+    }
+
+    protected AbstractBulkRecordWriter(Configuration conf)
+    {
+        Config.setClientMode(true);
+        Config.setOutboundBindAny(true);
+        this.conf = conf;
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS,
"0")));
+        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+    }
+
+    protected String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is not setting
java.io.tmpdir then define " + OUTPUT_LOCATION);
+        return dir;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        if (writer != null)
+        {
+            writer.close();
+            Future<StreamState> future = loader.stream();
+            while (true)
+            {
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (ExecutionException | TimeoutException te)
+                {
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+            if (loader.getFailedHosts().size() > 0)
+            {
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+                else
+                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+            }
+        }
+    }
+
+    public static class ExternalClient extends SSTableLoader.Client
+    {
+        private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
+        private final Configuration conf;
+        private final String hostlist;
+        private final int rpcPort;
+        private final String username;
+        private final String password;
+
+        public ExternalClient(Configuration conf)
+        {
+          super();
+          this.conf = conf;
+          this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
+          this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
+          this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
+          this.password = ConfigHelper.getOutputKeyspacePassword(conf);
+        }
+
+        public void init(String keyspace)
+        {
+            Set<InetAddress> hosts = new HashSet<InetAddress>();
+            String[] nodes = hostlist.split(",");
+            for (String node : nodes)
+            {
+                try
+                {
+                    hosts.add(InetAddress.getByName(node));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            Iterator<InetAddress> hostiter = hosts.iterator();
+            while (hostiter.hasNext())
+            {
+                try
+                {
+                    InetAddress host = hostiter.next();
+                    Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(),
rpcPort);
+
+                    // log in
+                    client.set_keyspace(keyspace);
+                    if (username != null)
+                    {
+                        Map<String, String> creds = new HashMap<String, String>();
+                        creds.put(IAuthenticator.USERNAME_KEY, username);
+                        creds.put(IAuthenticator.PASSWORD_KEY, password);
+                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+                        client.login(authRequest);
+                    }
+
+                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+                    List<KsDef> ksDefs = client.describe_keyspaces();
+
+                    setPartitioner(client.describe_partitioner());
+                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+                    for (TokenRange tr : tokenRanges)
+                    {
+                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token),
tkFactory.fromString(tr.end_token));
+                        for (String ep : tr.endpoints)
+                        {
+                            addRangeForEndpoint(range, InetAddress.getByName(ep));
+                        }
+                    }
+
+                    for (KsDef ksDef : ksDefs)
+                    {
+                        Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
+                        for (CfDef cfDef : ksDef.cf_defs)
+                            cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
+                        knownCfs.put(ksDef.name, cfs);
+                    }
+                    break;
+                }
+                catch (Exception e)
+                {
+                    if (!hostiter.hasNext())
+                        throw new RuntimeException("Could not retrieve endpoint ranges: ",
e);
+                }
+            }
+        }
+
+        public CFMetaData getCFMetaData(String keyspace, String cfName)
+        {
+            Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
+            return cfs != null ? cfs.get(cfName) : null;
+        } 
+    }
+
+    public static class NullOutputHandler implements OutputHandler
+    {
+        public void output(String msg) {}
+        public void debug(String msg) {}
+        public void warn(String msg) {}
+        public void warn(String msg, Throwable th) {}
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index c3d8e05..f5a5a8d 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,39 +23,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.thrift.Mutation;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 
-public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
-    implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
 {
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    private void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
-        }
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf
job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
     public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf
job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
     {
         return new BulkRecordWriter(context);
     }
-
-    public static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d6136a2..d67b856 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.protocol.*;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.hadoop.util.Progressable;
 
-final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
-implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
 {
-    private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
-    private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
-    private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
-    private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
-    private final Configuration conf;
-    private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
-    private SSTableSimpleUnsortedWriter writer;
-    private SSTableLoader loader;
-    private File outputdir;
-    private Progressable progress;
-    private TaskAttemptContext context;
-    private int maxFailures;
-
+    private File outputDir;
+    
+    
     private enum CFType
     {
         NORMAL,
@@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
     BulkRecordWriter(TaskAttemptContext context)
     {
-        this(HadoopCompat.getConfiguration(context));
-        this.context = context;
+        super(context);
     }
 
     BulkRecordWriter(Configuration conf, Progressable progress)
     {
-        this(conf);
-        this.progress = progress;
+        super(conf, progress);
     }
 
     BulkRecordWriter(Configuration conf)
     {
-        Config.setClientMode(true);
-        Config.setOutboundBindAny(true);
-        this.conf = conf;
-        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS,
"0")));
-        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
-    }
-
-    private String getOutputLocation() throws IOException
-    {
-        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
-        if (dir == null)
-            throw new IOException("Output directory not defined, if hadoop is not setting
java.io.tmpdir then define " + OUTPUT_LOCATION);
-        return dir;
+        super(conf);
     }
 
     private void setTypes(Mutation mutation)
@@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
     private void prepareWriter() throws IOException
     {
-        if (outputdir == null)
+        if (outputDir == null)
         {
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
             //dir must be named by ks/cf for the loader
-            outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator
+ ConfigHelper.getOutputColumnFamily(conf));
-            outputdir.mkdirs();
+            outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator
+ ConfigHelper.getOutputColumnFamily(conf));
+            outputDir.mkdirs();
         }
         
         if (writer == null)
         {
             AbstractType<?> subcomparator = null;
-            ExternalClient externalClient = null;
-            String username = ConfigHelper.getOutputKeyspaceUserName(conf);
-            String password = ConfigHelper.getOutputKeyspacePassword(conf);
 
             if (cfType == CFType.SUPER)
                 subcomparator = BytesType.instance;
 
-            this.writer = new SSTableSimpleUnsortedWriter(
-                    outputdir,
+            writer = new SSTableSimpleUnsortedWriter(
+                    outputDir,
                     ConfigHelper.getOutputPartitioner(conf),
                     ConfigHelper.getOutputKeyspace(conf),
                     ConfigHelper.getOutputColumnFamily(conf),
@@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
                     ConfigHelper.getOutputCompressionParamaters(conf));
 
-            externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
-                                                ConfigHelper.getOutputRpcPort(conf),
-                                                username,
-                                                password);
-
-            this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
+            this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler());
         }
     }
 
@@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     {
         setTypes(value.get(0));
         prepareWriter();
-        writer.newRow(keybuff);
+        SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer;
+        ssWriter.newRow(keybuff);
         for (Mutation mut : value)
         {
             if (cfType == CFType.SUPER)
             {
-                writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+                ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
                 if (colType == ColType.COUNTER)
                     for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns)
-                        writer.addCounterColumn(column.name, column.value);
+                        ssWriter.addCounterColumn(column.name, column.value);
                 else
                 {
                     for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
                     {
                         if(column.ttl == 0)
-                            writer.addColumn(column.name, column.value, column.timestamp);
+                            ssWriter.addColumn(column.name, column.value, column.timestamp);
                         else
-                            writer.addExpiringColumn(column.name, column.value, column.timestamp,
column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
+                            ssWriter.addExpiringColumn(column.name, column.value, column.timestamp,
column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
                     }
                 }
             }
             else
             {
                 if (colType == ColType.COUNTER)
-                    writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name,
mut.getColumn_or_supercolumn().counter_column.value);
+                    ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name,
mut.getColumn_or_supercolumn().counter_column.value);
                 else
                 {
                     if(mut.getColumn_or_supercolumn().column.ttl == 0)
-                        writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value,
mut.getColumn_or_supercolumn().column.timestamp);
+                        ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value,
mut.getColumn_or_supercolumn().column.timestamp);
                     else
-                        writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name,
mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp,
mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl)
* 1000));
+                        ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name,
mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp,
mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl)
* 1000));
                 }
             }
             if (null != progress)
@@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                 HadoopCompat.progress(context);
         }
     }
-    @Override
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-
-    private void close() throws IOException
-    {
-        if (writer != null)
-        {
-            writer.close();
-            Future<StreamState> future = loader.stream();
-            while (true)
-            {
-                try
-                {
-                    future.get(1000, TimeUnit.MILLISECONDS);
-                    break;
-                }
-                catch (ExecutionException | TimeoutException te)
-                {
-                    if (null != progress)
-                        progress.progress();
-                    if (null != context)
-                        HadoopCompat.progress(context);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-            if (loader.getFailedHosts().size() > 0)
-            {
-                if (loader.getFailedHosts().size() > maxFailures)
-                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
-                else
-                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
-            }
-        }
-    }
-
-    static class ExternalClient extends SSTableLoader.Client
-    {
-        private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
-        private final String hostlist;
-        private final int rpcPort;
-        private final String username;
-        private final String password;
-
-        public ExternalClient(String hostlist, int port, String username, String password)
-        {
-            super();
-            this.hostlist = hostlist;
-            this.rpcPort = port;
-            this.username = username;
-            this.password = password;
-        }
-
-        public void init(String keyspace)
-        {
-            Set<InetAddress> hosts = new HashSet<InetAddress>();
-            String[] nodes = hostlist.split(",");
-            for (String node : nodes)
-            {
-                try
-                {
-                    hosts.add(InetAddress.getByName(node));
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-            Iterator<InetAddress> hostiter = hosts.iterator();
-            while (hostiter.hasNext())
-            {
-                try
-                {
-                    InetAddress host = hostiter.next();
-                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
-
-                    // log in
-                    client.set_keyspace(keyspace);
-                    if (username != null)
-                    {
-                        Map<String, String> creds = new HashMap<String, String>();
-                        creds.put(IAuthenticator.USERNAME_KEY, username);
-                        creds.put(IAuthenticator.PASSWORD_KEY, password);
-                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-                        client.login(authRequest);
-                    }
-
-                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
-                    List<KsDef> ksDefs = client.describe_keyspaces();
-
-                    setPartitioner(client.describe_partitioner());
-                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
-                    for (TokenRange tr : tokenRanges)
-                    {
-                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token),
tkFactory.fromString(tr.end_token));
-                        for (String ep : tr.endpoints)
-                        {
-                            addRangeForEndpoint(range, InetAddress.getByName(ep));
-                        }
-                    }
-
-                    for (KsDef ksDef : ksDefs)
-                    {
-                        Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
-                        for (CfDef cfDef : ksDef.cf_defs)
-                            cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
-                        knownCfs.put(ksDef.name, cfs);
-                    }
-                    break;
-                }
-                catch (Exception e)
-                {
-                    if (!hostiter.hasNext())
-                        throw new RuntimeException("Could not retrieve endpoint ranges: ",
e);
-                }
-            }
-        }
-
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
-        }
-
-        private static Cassandra.Client createThriftClient(String host, int port) throws
TTransportException
-        {
-            TSocket socket = new TSocket(host, port);
-            TTransport trans = new TFramedTransport(socket);
-            trans.open();
-            TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
-            return new Cassandra.Client(protocol);
-        }
-    }
-
-    static class NullOutputHandler implements OutputHandler
-    {
-        public void output(String msg) {}
-        public void debug(String msg) {}
-        public void warn(String msg) {}
-        public void warn(String msg, Throwable th) {}
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index b2c8fbf..e894996 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -89,7 +89,7 @@ public class CqlConfigHelper
     private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
 
     private static final String OUTPUT_CQL = "cassandra.output.cql";
-
+    
     /**
      * Set the CQL columns for the input of this job.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index f8613ba..0d09ca2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*;
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
- *  binded variable values) as CQL rows (and respective columns) in a given
+ * bound variable values) as CQL rows (and respective columns) in a given
  * ColumnFamily.
  *
  * <p>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 1b407c5..ae8300c 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -34,7 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
 
-public abstract class AbstractSSTableSimpleWriter
+public abstract class AbstractSSTableSimpleWriter implements Closeable
 {
     protected final File directory;
     protected final CFMetaData metadata;
@@ -162,13 +163,6 @@ public abstract class AbstractSSTableSimpleWriter
     }
 
     /**
-     * Close this writer.
-     * This method should be called, otherwise the produced sstables are not
-     * guaranteed to be complete (and won't be in practice).
-     */
-    public abstract void close() throws IOException;
-
-    /**
      * Package protected for use by AbstractCQLSSTableWriter.
      * Not meant to be exposed publicly.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 6993b19..427d2d4 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair;
  *   writer.close();
  * </pre>
  */
-public class CQLSSTableWriter
+public class CQLSSTableWriter implements Closeable
 {
     private final AbstractSSTableSimpleWriter writer;
     private final UpdateStatement insert;


Mime
View raw message