cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [3/4] cassandra git commit: Remove deprecated legacy Hadoop code
Date Mon, 08 Jun 2015 19:43:42 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
deleted file mode 100644
index 92e3829..0000000
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.*;
-
-import org.apache.cassandra.auth.*;
-import org.apache.cassandra.thrift.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.protocol.*;
-import org.apache.thrift.transport.*;
-
-/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
- * OutputFormat that allows reduce tasks to store keys (and corresponding
- * values) as Cassandra rows (and respective columns) in a given
- * ColumnFamily.
- *
- * <p>
- * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
- * Keyspace and ColumnFamily in your
- * Hadoop job Configuration. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
- * simple.
- * </p>
- *
- * <p>
- * For the sake of performance, this class employs a lazy write-back caching
- * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
- * </p>
- */
-@Deprecated
-public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
-        implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
-{
-    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
-    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
-
-    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
-
-    /**
-     * The OutputCommitter for this format does not write any data to the DFS.
-     *
-     * @param context
-     *            the task context
-     * @return an output committer
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /**
-     * Check for validity of the output-specification for the job.
-     *
-     * @param context
-     *            information about the job
-     */
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    protected void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
-    /**
-     * Connects to the given server:port and returns a client based on the given socket that points to the configured
-     * keyspace, and is logged in with the configured credentials.
-     *
-     * @param host fully qualified host name to connect to
-     * @param port RPC port of the server
-     * @param conf a job configuration
-     * @return a cassandra client
-     * @throws Exception set of thrown exceptions may be implementation defined,
-     *                   depending on the used transport factory
-     */
-    @SuppressWarnings("resource")
-    public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
-    {
-        logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
-        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
-        String user = ConfigHelper.getOutputKeyspaceUserName(conf);
-        String password = ConfigHelper.getOutputKeyspacePassword(conf);
-        if ((user != null) && (password != null))
-            login(user, password, client);
-
-        logger.debug("Authenticated client for CF output format created successfully");
-        return client;
-    }
-
-    public static void login(String user, String password, Cassandra.Client client) throws Exception
-    {
-        Map<String, String> creds = new HashMap<String, String>();
-        creds.put(PasswordAuthenticator.USERNAME_KEY, user);
-        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
-        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-        client.login(authRequest);
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
-    {
-        return new ColumnFamilyRecordWriter(job, progress);
-    }
-
-    /**
-     * Get the {@link RecordWriter} for the given task.
-     *
-     * @param context
-     *            the information about the current task.
-     * @return a {@link RecordWriter} to write the output for the job.
-     */
-    public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws InterruptedException
-    {
-        return new ColumnFamilyRecordWriter(context);
-    }
-
-    /**
-     * An {@link OutputCommitter} that does nothing.
-     */
-    private static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
deleted file mode 100644
index aee730d..0000000
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-
-@Deprecated
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
-    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>>
-{
-    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
-
-    public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
-
-    private ColumnFamilySplit split;
-    private RowIterator iter;
-    private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
-    private SlicePredicate predicate;
-    private boolean isEmptyPredicate;
-    private int totalRowCount; // total number of rows to fetch
-    private int batchSize; // fetch this many per batch
-    private String keyspace;
-    private String cfName;
-    private Cassandra.Client client;
-    private ConsistencyLevel consistencyLevel;
-    private int keyBufferSize = 8192;
-    private List<IndexExpression> filter;
-
-
-    public ColumnFamilyRecordReader()
-    {
-        this(ColumnFamilyRecordReader.CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT);
-    }
-
-    public ColumnFamilyRecordReader(int keyBufferSize)
-    {
-        super();
-        this.keyBufferSize = keyBufferSize;
-    }
-
-    @SuppressWarnings("resource")
-    public void close()
-    {
-        if (client != null)
-        {
-            TTransport transport = client.getOutputProtocol().getTransport();
-            if (transport.isOpen())
-                transport.close();
-        }
-    }
-
-    public ByteBuffer getCurrentKey()
-    {
-        return currentRow.left;
-    }
-
-    public SortedMap<ByteBuffer, Column> getCurrentValue()
-    {
-        return currentRow.right;
-    }
-
-    public float getProgress()
-    {
-        if (!iter.hasNext())
-            return 1.0F;
-
-        // the progress is likely to be reported slightly off the actual but close enough
-        float progress = ((float) iter.rowsRead() / totalRowCount);
-        return progress > 1.0F ? 1.0F : progress;
-    }
-
-    static boolean isEmptyPredicate(SlicePredicate predicate)
-    {
-        if (predicate == null)
-            return true;
-
-        if (predicate.isSetColumn_names() && predicate.getSlice_range() == null)
-            return false;
-
-        if (predicate.getSlice_range() == null)
-            return true;
-
-        byte[] start = predicate.getSlice_range().getStart();
-        if ((start != null) && (start.length > 0))
-            return false;
-
-        byte[] finish = predicate.getSlice_range().getFinish();
-        if ((finish != null) && (finish.length > 0))
-            return false;
-
-        return true;
-    }
-
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
-    {
-        this.split = (ColumnFamilySplit) split;
-        Configuration conf = HadoopCompat.getConfiguration(context);
-        KeyRange jobRange = ConfigHelper.getInputKeyRange(conf);
-        filter = jobRange == null ? null : jobRange.row_filter;
-        predicate = ConfigHelper.getInputSlicePredicate(conf);
-        boolean widerows = ConfigHelper.getInputIsWide(conf);
-        isEmptyPredicate = isEmptyPredicate(predicate);
-        totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
-                ? (int) this.split.getLength()
-                : ConfigHelper.getInputSplitSize(conf);
-        batchSize = ConfigHelper.getRangeBatchSize(conf);
-        cfName = ConfigHelper.getInputColumnFamily(conf);
-        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
-        keyspace = ConfigHelper.getInputKeyspace(conf);
-        
-        if (batchSize < 2)
-            throw new IllegalArgumentException("Minimum batchSize is 2.  Suggested batchSize is 100 or more");
-
-        try
-        {
-            if (client != null)
-                return;
-
-            // create connection using thrift
-            String location = getLocation();
-
-            int port = ConfigHelper.getInputRpcPort(conf);
-            client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf);
-
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        iter = widerows ? new WideRowIterator() : new StaticRowIterator();
-        logger.debug("created {}", iter);
-    }
-
-    public boolean nextKeyValue() throws IOException
-    {
-        if (!iter.hasNext())
-        {
-            logger.debug("Finished scanning {} rows (estimate was: {})", iter.rowsRead(), totalRowCount);
-            return false;
-        }
-
-        currentRow = iter.next();
-        return true;
-    }
-
-    // we don't use endpointsnitch since we are trying to support hadoop nodes that are
-    // not necessarily on Cassandra machines, too.  This should be adequate for single-DC clusters, at least.
-    private String getLocation()
-    {
-        Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses();
-
-        for (InetAddress address : localAddresses)
-        {
-            for (String location : split.getLocations())
-            {
-                InetAddress locationAddress = null;
-                try
-                {
-                    locationAddress = InetAddress.getByName(location);
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new AssertionError(e);
-                }
-                if (address.equals(locationAddress))
-                {
-                    return location;
-                }
-            }
-        }
-        return split.getLocations()[0];
-    }
-
-    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
-    {
-        protected List<KeySlice> rows;
-        protected int totalRead = 0;
-        protected final boolean isSuper;
-        protected final AbstractType<?> comparator;
-        protected final AbstractType<?> subComparator;
-        protected final IPartitioner partitioner;
-
-        private RowIterator()
-        {
-            CfDef cfDef = new CfDef();
-            try
-            {
-                partitioner = FBUtilities.newPartitioner(client.describe_partitioner());           
-                // get CF meta data
-                String query = String.format("SELECT comparator, subcomparator, type " +
-                                             "FROM %s.%s " +
-                                             "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                                             SystemKeyspace.NAME,
-                                             LegacySchemaTables.COLUMNFAMILIES,
-                                             keyspace,
-                                             cfName);
-
-                CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
-                Iterator<CqlRow> iteraRow = result.rows.iterator();
-
-                if (iteraRow.hasNext())
-                {
-                    CqlRow cqlRow = iteraRow.next();
-                    cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
-                    ByteBuffer subComparator = cqlRow.columns.get(1).value;
-                    if (subComparator != null)
-                        cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
-                    
-                    ByteBuffer type = cqlRow.columns.get(2).value;
-                    if (type != null)
-                        cfDef.column_type = ByteBufferUtil.string(type);
-                }
-
-                comparator = TypeParser.parse(cfDef.comparator_type);
-                subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type);
-            }
-            catch (ConfigurationException e)
-            {
-                throw new RuntimeException("unable to load sub/comparator", e);
-            }
-            catch (TException e)
-            {
-                throw new RuntimeException("error communicating via Thrift", e);
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException("unable to load keyspace " + keyspace, e);
-            }
-            isSuper = "Super".equalsIgnoreCase(cfDef.column_type);
-        }
-
-        /**
-         * @return total number of rows read by this record reader
-         */
-        public int rowsRead()
-        {
-            return totalRead;
-        }
-
-        protected List<Pair<ByteBuffer, Column>> unthriftify(ColumnOrSuperColumn cosc)
-        {
-            if (cosc.counter_column != null)
-                return Collections.singletonList(unthriftifyCounter(cosc.counter_column));
-            if (cosc.counter_super_column != null)
-                return unthriftifySuperCounter(cosc.counter_super_column);
-            if (cosc.super_column != null)
-                return unthriftifySuper(cosc.super_column);
-            assert cosc.column != null;
-            return Collections.singletonList(unthriftifySimple(cosc.column));
-        }
-
-        private List<Pair<ByteBuffer, Column>> unthriftifySuper(SuperColumn super_column)
-        {
-            List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
-            for (org.apache.cassandra.thrift.Column column : super_column.columns)
-            {
-                Pair<ByteBuffer, Column> c = unthriftifySimple(column);
-                columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
-            }
-            return columns;
-        }
-
-        protected Pair<ByteBuffer, Column> unthriftifySimple(org.apache.cassandra.thrift.Column column)
-        {
-            return Pair.create(column.name, Column.fromRegularColumn(column));
-        }
-
-        private Pair<ByteBuffer, Column> unthriftifyCounter(CounterColumn column)
-        {
-            return Pair.create(column.name, Column.fromCounterColumn(column));
-        }
-
-        private List<Pair<ByteBuffer, Column>> unthriftifySuperCounter(CounterSuperColumn super_column)
-        {
-            List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size());
-            for (CounterColumn column : super_column.columns)
-            {
-                Pair<ByteBuffer, Column> c = unthriftifyCounter(column);
-                columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right));
-            }
-            return columns;
-        }
-    }
-
-    private class StaticRowIterator extends RowIterator
-    {
-        protected int i = 0;
-
-        private void maybeInit()
-        {
-            // check if we need another batch
-            if (rows != null && i < rows.size())
-                return;
-
-            String startToken;
-            if (totalRead == 0)
-            {
-                // first request
-                startToken = split.getStartToken();
-            }
-            else
-            {
-                startToken = partitioner.getTokenFactory().toString(partitioner.getToken(Iterables.getLast(rows).key));
-                if (startToken.equals(split.getEndToken()))
-                {
-                    // reached end of the split
-                    rows = null;
-                    return;
-                }
-            }
-
-            KeyRange keyRange = new KeyRange(batchSize)
-                                .setStart_token(startToken)
-                                .setEnd_token(split.getEndToken())
-                                .setRow_filter(filter);
-            try
-            {
-                rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel);
-
-                // nothing new? reached the end
-                if (rows.isEmpty())
-                {
-                    rows = null;
-                    return;
-                }
-
-                // remove ghosts when fetching all columns
-                if (isEmptyPredicate)
-                {
-                    Iterator<KeySlice> it = rows.iterator();
-                    KeySlice ks;
-                    do
-                    {
-                        ks = it.next();
-                        if (ks.getColumnsSize() == 0)
-                        {
-                            it.remove();
-                        }
-                    } while (it.hasNext());
-
-                    // all ghosts, spooky
-                    if (rows.isEmpty())
-                    {
-                        // maybeInit assumes it can get the start-with key from the rows collection, so add back the last
-                        rows.add(ks);
-                        maybeInit();
-                        return;
-                    }
-                }
-
-                // reset to iterate through this new batch
-                i = 0;
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
-        {
-            maybeInit();
-            if (rows == null)
-                return endOfData();
-
-            totalRead++;
-            KeySlice ks = rows.get(i++);
-            AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
-            SortedMap<ByteBuffer, Column> map = new TreeMap<>(comp);
-            for (ColumnOrSuperColumn cosc : ks.columns)
-            {
-                List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
-                for (Pair<ByteBuffer, Column> column : columns)
-                    map.put(column.left, column.right);
-            }
-            return Pair.create(ks.key, map);
-        }
-    }
-
-    private class WideRowIterator extends RowIterator
-    {
-        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
-        private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        private void maybeInit()
-        {
-            if (wideColumns != null && wideColumns.hasNext())
-                return;
-
-            KeyRange keyRange;
-            if (totalRead == 0)
-            {
-                String startToken = split.getStartToken();
-                keyRange = new KeyRange(batchSize)
-                          .setStart_token(startToken)
-                          .setEnd_token(split.getEndToken())
-                          .setRow_filter(filter);
-            }
-            else
-            {
-                KeySlice lastRow = Iterables.getLast(rows);
-                logger.debug("Starting with last-seen row {}", lastRow.key);
-                keyRange = new KeyRange(batchSize)
-                          .setStart_key(lastRow.key)
-                          .setEnd_token(split.getEndToken())
-                          .setRow_filter(filter);
-            }
-
-            try
-            {
-                rows = client.get_paged_slice(cfName, keyRange, lastColumn, consistencyLevel);
-                int n = 0;
-                for (KeySlice row : rows)
-                    n += row.columns.size();
-                logger.debug("read {} columns in {} rows for {} starting with {}",
-                             new Object[]{ n, rows.size(), keyRange, lastColumn });
-
-                wideColumns = Iterators.peekingIterator(new WideColumnIterator(rows));
-                if (wideColumns.hasNext() && wideColumns.peek().right.keySet().iterator().next().equals(lastColumn))
-                    wideColumns.next();
-                if (!wideColumns.hasNext())
-                    rows = null;
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
-        {
-            maybeInit();
-            if (rows == null)
-                return endOfData();
-
-            Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
-            lastColumn = next.right.keySet().iterator().next().duplicate();
-
-            maybeIncreaseRowCounter(next);
-            return next;
-        }
-
-
-        /**
-         * Increases the row counter only if we really moved to the next row.
-         * @param next just fetched row slice
-         */
-        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
-        {
-            ByteBuffer currentKey = next.left;
-            if (!currentKey.equals(lastCountedKey))
-            {
-                totalRead++;
-                lastCountedKey = currentKey;
-            }
-        }
-
-        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
-        {
-            private final Iterator<KeySlice> rows;
-            private Iterator<ColumnOrSuperColumn> columns;
-            public KeySlice currentRow;
-
-            public WideColumnIterator(List<KeySlice> rows)
-            {
-                this.rows = rows.iterator();
-                if (this.rows.hasNext())
-                    nextRow();
-                else
-                    columns = Iterators.emptyIterator();
-            }
-
-            private void nextRow()
-            {
-                currentRow = rows.next();
-                columns = currentRow.columns.iterator();
-            }
-
-            protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
-            {
-                AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator;
-                while (true)
-                {
-                    if (columns.hasNext())
-                    {
-                        ColumnOrSuperColumn cosc = columns.next();
-                        SortedMap<ByteBuffer, Column> map;
-                        List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc);
-                        if (columns.size() == 1)
-                        {
-                            map = ImmutableSortedMap.of(columns.get(0).left, columns.get(0).right);
-                        }
-                        else
-                        {
-                            assert isSuper;
-                            map = new TreeMap<>(comp);
-                            for (Pair<ByteBuffer, Column> column : columns)
-                                map.put(column.left, column.right);
-                        }
-                        return Pair.create(currentRow.key, map);
-                    }
-
-                    if (!rows.hasNext())
-                        return endOfData();
-
-                    nextRow();
-                }
-            }
-        }
-    }
-
-    // Because the old Hadoop API wants us to write to the key and value
-    // and the new asks for them, we need to copy the output of the new API
-    // to the old. Thus, expect a small performance hit.
-    // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
-    // and ColumnFamilyRecordReader don't support them, it should be fine for now.
-    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
-    {
-        if (this.nextKeyValue())
-        {
-            key.clear();
-            key.put(this.getCurrentKey().duplicate());
-            key.flip();
-
-            value.clear();
-            value.putAll(this.getCurrentValue());
-
-            return true;
-        }
-        return false;
-    }
-
-    public ByteBuffer createKey()
-    {
-        return ByteBuffer.wrap(new byte[this.keyBufferSize]);
-    }
-
-    public SortedMap<ByteBuffer, Column> createValue()
-    {
-        return new TreeMap<>();
-    }
-
-    public long getPos() throws IOException
-    {
-        return iter.rowsRead();
-    }
-
-    public static final class Column
-    {
-        public final ByteBuffer name;
-        public final ByteBuffer value;
-        public final long timestamp;
-
-        private Column(ByteBuffer name, ByteBuffer value, long timestamp)
-        {
-            this.name = name;
-            this.value = value;
-            this.timestamp = timestamp;
-        }
-
-        static Column fromRegularColumn(org.apache.cassandra.thrift.Column input)
-        {
-            return new Column(input.name, input.value, input.timestamp);
-        }
-
-        static Column fromCounterColumn(org.apache.cassandra.thrift.CounterColumn input)
-        {
-            return new Column(input.name, ByteBufferUtil.bytes(input.value), 0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
deleted file mode 100644
index 9547d0e..0000000
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.client.*;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.utils.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TException;
-import org.apache.hadoop.util.Progressable;
-import org.apache.thrift.transport.*;
-
-
-/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
- *
- * <p>
- * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
- * </p>
- *
- * @see ColumnFamilyOutputFormat
- */
-@Deprecated
-final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements
-        org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
-{
-    // The configuration this writer is associated with.
-    protected final Configuration conf;
-
-    // The number of mutations to buffer per endpoint
-    protected final int queueSize;
-
-    protected final long batchThreshold;
-
-    protected final ConsistencyLevel consistencyLevel;
-    protected Progressable progressable;
-    protected TaskAttemptContext context;
-    // handles for clients for each range running in the threadpool
-    private final Map<Range, RangeClient> clients;
-
-    // The ring cache that describes the token ranges each node in the ring is
-    // responsible for. This is what allows us to group the mutations by
-    // the endpoints they should be targeted at. The targeted endpoint
-    // essentially
-    // acts as the primary replica for the rows being affected by the mutations.
-    private final RingCache ringCache;
-    
-    /**
-     * Upon construction, obtain the map that this writer will use to collect
-     * mutations, and the ring cache for the given keyspace.
-     *
-     * @param context the task attempt context
-     * @throws IOException
-     */
-    ColumnFamilyRecordWriter(TaskAttemptContext context)
-    {
-        this(HadoopCompat.getConfiguration(context));
-        this.context = context;
-
-    }
-    ColumnFamilyRecordWriter(Configuration conf, Progressable progressable)
-    {
-        this(conf);
-        this.progressable = progressable;
-    }
-
-    ColumnFamilyRecordWriter(Configuration conf)
-    {
-        this.conf = conf;
-        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
-        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
-        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
-        this.ringCache = new RingCache(conf);
-        this.clients = new HashMap<Range, RangeClient>();
-    }
-
-    /**
-     * Close this <code>RecordWriter</code> to future operations, but not before
-     * flushing out the batched mutations.
-     *
-     * @param context the context of the task
-     * @throws IOException
-     */
-    public void close(TaskAttemptContext context) throws IOException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-
-    public void close() throws IOException
-    {
-        // close all the clients before throwing anything
-        IOException clientException = null;
-        for (RangeClient client : clients.values())
-        {
-            try
-            {
-                client.close();
-            }
-            catch (IOException e)
-            {
-                clientException = e;
-            }
-        }
-        if (clientException != null)
-            throw clientException;
-    }
-    
-    /**
-     * If the key is to be associated with a valid value, a mutation is created
-     * for it with the given column family and columns. In the event the value
-     * in the column is missing (i.e., null), then it is marked for
-     * {@link Deletion}. Similarly, if the entire value for a key is missing
-     * (i.e., null), then the entire key is marked for {@link Deletion}.
-     * </p>
-     *
-     * @param keybuff
-     *            the key to write.
-     * @param value
-     *            the value to write.
-     * @throws IOException
-     */
-    @Override
-    public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
-    {
-        Range<Token> range = ringCache.getRange(keybuff);
-
-        // get the client for the given range, or create a new one
-        RangeClient client = clients.get(range);
-        if (client == null)
-        {
-            // haven't seen keys for this range: create new client
-            client = new RangeClient(ringCache.getEndpoint(range));
-            client.start();
-            clients.put(range, client);
-        }
-
-        for (Mutation amut : value)
-            client.put(Pair.create(keybuff, amut));
-        if (progressable != null)
-            progressable.progress();
-        if (context != null)
-            HadoopCompat.progress(context);
-    }
-
-    /**
-     * A client that runs in a threadpool and connects to the list of endpoints for a particular
-     * range. Mutations for keys in that range are sent to this client via a queue.
-     */
-    public class RangeClient extends Thread
-    {
-        // The list of endpoints for this range
-        protected final List<InetAddress> endpoints;
-        // A bounded queue of incoming mutations for this range
-        protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize);
-
-        protected volatile boolean run = true;
-        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
-        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
-        // when the client is closed.
-        protected volatile IOException lastException;
-
-        protected Cassandra.Client client;
-        public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
-        
-        /**
-        * Constructs an {@link RangeClient} for the given endpoints.
-        * @param endpoints the possible endpoints to execute the mutations on
-        */
-        public RangeClient(List<InetAddress> endpoints)
-        {
-            super("client-" + endpoints);
-            this.endpoints = endpoints;
-         }
-
-        /**
-         * enqueues the given value to Cassandra
-         */
-        public void put(Pair<ByteBuffer, Mutation> value) throws IOException
-        {
-            while (true)
-            {
-                if (lastException != null)
-                    throw lastException;
-                try
-                {
-                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
-                        break;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-            }
-        }
-
-        public void close() throws IOException
-        {
-            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
-            run = false;
-            interrupt();
-            try
-            {
-                this.join();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-
-            if (lastException != null)
-                throw lastException;
-        }
-
-        @SuppressWarnings("resource")
-        protected void closeInternal()
-        {
-            if (client != null)
-            {
-                TTransport transport = client.getOutputProtocol().getTransport();
-                if (transport.isOpen())
-                    transport.close();
-            }
-        }
-        
-        /**
-         * Loops collecting mutations from the queue and sending to Cassandra
-         */
-        public void run()
-        {
-            outer:
-            while (run || !queue.isEmpty())
-            {
-                Pair<ByteBuffer, Mutation> mutation;
-                try
-                {
-                    mutation = queue.take();
-                }
-                catch (InterruptedException e)
-                {
-                    // re-check loop condition after interrupt
-                    continue;
-                }
-
-                Map<ByteBuffer, Map<String, List<Mutation>>> batch = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-                while (mutation != null)
-                {
-                    Map<String, List<Mutation>> subBatch = batch.get(mutation.left);
-                    if (subBatch == null)
-                    {
-                        subBatch = Collections.singletonMap(columnFamily, (List<Mutation>) new ArrayList<Mutation>());
-                        batch.put(mutation.left, subBatch);
-                    }
-
-                    subBatch.get(columnFamily).add(mutation.right);
-                    if (batch.size() >= batchThreshold)
-                        break;
-
-                    mutation = queue.poll();
-                }
-
-                Iterator<InetAddress> iter = endpoints.iterator();
-                while (true)
-                {
-                    // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
-                    try
-                    {
-                        client.batch_mutate(batch, consistencyLevel);
-                        break;
-                    }
-                    catch (Exception e)
-                    {
-                        closeInternal();
-                        if (!iter.hasNext())
-                        {
-                            lastException = new IOException(e);
-                            break outer;
-                        }
-                    }
-
-                    // attempt to connect to a different endpoint
-                    try
-                    {
-                        InetAddress address = iter.next();
-                        String host = address.getHostName();
-                        int port = ConfigHelper.getOutputRpcPort(conf);
-                        client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
-                    }
-                    catch (Exception e)
-                    {
-                        closeInternal();
-                        // TException means something unexpected went wrong to that endpoint, so
-                        // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
-                        if ((!(e instanceof TException)) || !iter.hasNext())
-                        {
-                            lastException = new IOException(e);
-                            break outer;
-                        }
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e77c4c8..3e69c2d 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.BulkRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
@@ -41,6 +40,7 @@ import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.utils.NativeSSTableLoaderClient;
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -152,7 +152,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
             ExternalClient externalClient = new ExternalClient(conf);
             externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
 
-            loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
+            loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler())
             {
                 @Override
                 public void onSuccess(StreamState finalState)
@@ -287,4 +287,12 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
             return addresses;
         }
     }
+
+    public static class NullOutputHandler implements OutputHandler
+    {
+        public void output(String msg) {}
+        public void debug(String msg) {}
+        public void warn(String msg) {}
+        public void warn(String msg, Throwable th) {}
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 09bd80c..70429a8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -18,12 +18,44 @@
 package org.apache.cassandra.hadoop.cql3;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
@@ -48,8 +80,15 @@ import com.datastax.driver.core.Row;
  *   
  *   other native protocol connection parameters in CqlConfigHelper
  */
-public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row>
+public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long, Row> implements org.apache.hadoop.mapred.InputFormat<Long, Row>
 {
+    public static final String MAPRED_TASK_ID = "mapred.task.id";
+    private static final Logger logger = LoggerFactory.getLogger(CqlInputFormat.class);
+    private String keyspace;
+    private String cfName;
+    private IPartitioner partitioner;
+    private Session session;
+
     public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
             throws IOException
     {
@@ -75,4 +114,238 @@ public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row>
         return new CqlRecordReader();
     }
 
+    protected void validateConfiguration(Configuration conf)
+    {
+        if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()");
+        }
+        if (ConfigHelper.getInputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
+        if (ConfigHelper.getInputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
+    }
+
+    public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext context) throws IOException
+    {
+        Configuration conf = HadoopCompat.getConfiguration(context);
+
+        validateConfiguration(conf);
+
+        keyspace = ConfigHelper.getInputKeyspace(conf);
+        cfName = ConfigHelper.getInputColumnFamily(conf);
+        partitioner = ConfigHelper.getInputPartitioner(conf);
+        logger.debug("partitioner is {}", partitioner);
+
+        // canonical ranges and nodes holding replicas
+        Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
+
+        // canonical ranges, split into pieces, fetching the splits in parallel
+        ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
+
+        try
+        {
+            List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
+            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+            Range<Token> jobRange = null;
+            if (jobKeyRange != null)
+            {
+                if (jobKeyRange.start_key != null)
+                {
+                    if (!partitioner.preservesOrder())
+                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
+                    if (jobKeyRange.start_token != null)
+                        throw new IllegalArgumentException("only start_key supported");
+                    if (jobKeyRange.end_token != null)
+                        throw new IllegalArgumentException("only start_key supported");
+                    jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
+                                           partitioner.getToken(jobKeyRange.end_key));
+                }
+                else if (jobKeyRange.start_token != null)
+                {
+                    jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
+                                           partitioner.getTokenFactory().fromString(jobKeyRange.end_token));
+                }
+                else
+                {
+                    logger.warn("ignoring jobKeyRange specified without start_key or start_token");
+                }
+            }
+
+            session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+            Metadata metadata = session.getCluster().getMetadata();
+
+            for (TokenRange range : masterRangeNodes.keySet())
+            {
+                if (jobRange == null)
+                {
+                    // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
+                }
+                else
+                {
+                    TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
+                    if (range.intersects(jobTokenRange))
+                    {
+                        for (TokenRange intersection: range.intersectWith(jobTokenRange))
+                        {
+                            // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
+                        }
+                    }
+                }
+            }
+
+            // wait until we have all the results back
+            for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures)
+            {
+                try
+                {
+                    splits.addAll(futureInputSplits.get());
+                }
+                catch (Exception e)
+                {
+                    throw new IOException("Could not get input splits", e);
+                }
+            }
+        }
+        finally
+        {
+            executor.shutdownNow();
+        }
+
+        assert splits.size() > 0;
+        Collections.shuffle(splits, new Random(System.nanoTime()));
+        return splits;
+    }
+
+    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+    {
+        return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+                metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+    }
+
+    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+    {
+        int splitSize = ConfigHelper.getInputSplitSize(conf);
+        try
+        {
+            return describeSplits(keyspace, cfName, range, splitSize);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
+    {
+        try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
+        {
+            Map<TokenRange, Set<Host>> map = new HashMap<>();
+            Metadata metadata = session.getCluster().getMetadata();
+            for (TokenRange tokenRange : metadata.getTokenRanges())
+                map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
+            return map;
+        }
+    }
+
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
+    {
+        String query = String.format("SELECT mean_partition_size, partitions_count " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.SIZE_ESTIMATES);
+
+        ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+
+        Row row = resultSet.one();
+        // If we have no data on this split, return the full split i.e., do not sub-split
+        // Assume smallest granularity of partition count available from CASSANDRA-7688
+        if (row == null)
+        {
+            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
+            wrappedTokenRange.put(tokenRange, (long) 128);
+            return wrappedTokenRange;
+        }
+
+        long meanPartitionSize = row.getLong("mean_partition_size");
+        long partitionCount = row.getLong("partitions_count");
+
+        int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
+        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
+        for (TokenRange range : splitRanges)
+            rangesWithLength.put(range, partitionCount/splitCount);
+
+        return rangesWithLength;
+    }
+
+    // Old Hadoop API
+    public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
+    {
+        TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
+        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
+        InputSplit[] oldInputSplits = new InputSplit[newInputSplits.size()];
+        for (int i = 0; i < newInputSplits.size(); i++)
+            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
+        return oldInputSplits;
+    }
+
+    /**
+     * Gets a token tokenRange and splits it up according to the suggested
+     * size into input splits that Hadoop can use.
+     */
+    class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>>
+    {
+
+        private final TokenRange tokenRange;
+        private final Set<Host> hosts;
+        private final Configuration conf;
+
+        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
+        {
+            this.tokenRange = tr;
+            this.hosts = hosts;
+            this.conf = conf;
+        }
+
+        public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception
+        {
+            ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
+            Map<TokenRange, Long> subSplits;
+            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
+            // turn the sub-ranges into InputSplits
+            String[] endpoints = new String[hosts.size()];
+
+            // hadoop needs hostname, not ip
+            int endpointIndex = 0;
+            for (Host endpoint : hosts)
+                endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
+
+            boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
+
+            for (TokenRange subSplit : subSplits.keySet())
+            {
+                List<TokenRange> ranges = subSplit.unwrap();
+                for (TokenRange subrange : ranges)
+                {
+                    ColumnFamilySplit split =
+                            new ColumnFamilySplit(
+                                    partitionerIsOpp ?
+                                            subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
+                                    partitionerIsOpp ?
+                                            subrange.getEnd().toString().substring(2) : subrange.getStart().toString(),
+                                    subSplits.get(subSplit),
+                                    endpoints);
+
+                    logger.debug("adding {}", split);
+                    splits.add(split);
+                }
+            }
+            return splits;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 9a1cda6..cc0a6b1 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -55,6 +55,9 @@ import org.apache.hadoop.mapreduce.*;
 public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
         implements org.apache.hadoop.mapred.OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
 {
+    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
+
     /**
      * Check for validity of the output-specification for the job.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 78b0494..c9198c6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -31,7 +31,6 @@ import com.datastax.driver.core.exceptions.*;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.utils.FBUtilities;
@@ -109,8 +108,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
     CqlRecordWriter(Configuration conf)
     {
         this.conf = conf;
-        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
-        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+        this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+        batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32);
         this.clients = new HashMap<>();
 
         try


Mime
View raw message