Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6C7B49814 for ; Wed, 11 Apr 2012 18:26:46 +0000 (UTC) Received: (qmail 59273 invoked by uid 500); 11 Apr 2012 18:26:44 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 59148 invoked by uid 500); 11 Apr 2012 18:26:44 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 58847 invoked by uid 99); 11 Apr 2012 18:26:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Apr 2012 18:26:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 89652C691; Wed, 11 Apr 2012 18:26:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [6/21] git commit: update get_paged_slice to allow starting with a key; fixes for WideRowIterator patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3883 Message-Id: <20120411182643.89652C691@tyr.zones.apache.org> Date: Wed, 11 Apr 2012 18:26:43 +0000 (UTC) update get_paged_slice to allow starting with a key; fixes for WideRowIterator patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3883 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97aa922a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97aa922a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97aa922a Branch: refs/heads/cassandra-1.1.0 Commit: 97aa922a7476dce06121ae289877abccf161afae Parents: dbc0f59 Author: Jonathan Ellis Authored: Tue Apr 10 16:06:39 2012 -0500 Committer: Jonathan Ellis Committed: Wed Apr 11 13:24:56 2012 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 25 ++- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 136 ++++++++------ .../apache/cassandra/thrift/CassandraServer.java | 5 +- .../apache/cassandra/thrift/ThriftValidation.java | 19 +-- 4 files changed, 108 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index ef56678..354903d 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -34,6 +34,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.collect.ImmutableList; + import org.apache.cassandra.db.IColumn; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -87,6 +89,7 @@ public class ColumnFamilyInputFormat extends InputFormat getSplits(JobContext context) throws IOException @@ -115,6 +118,8 @@ public class ColumnFamilyInputFormat extends InputFormat>> splitfutures = new ArrayList>>(); KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); - IPartitioner partitioner = null; Range jobRange = null; if (jobKeyRange != null && jobKeyRange.start_token != null) { - partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner"; assert jobKeyRange.start_key == null : "only start_token supported"; assert jobKeyRange.end_key == null : "only end_token supported"; @@ -219,11 +222,19 @@ public class ColumnFamilyInputFormat extends InputFormat range = new Range(left, right, partitioner); + List> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range); + for (Range subrange : ranges) + { + ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left), factory.toString(subrange.right), endpoints); + logger.debug("adding " + split); + splits.add(split); + } } return splits; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 483c040..600cf13 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -29,10 +29,9 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Iterables; -import org.apache.commons.lang.ArrayUtils; +import com.google.common.collect.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.ConfigurationException; @@ -55,6 +54,8 @@ import org.apache.thrift.transport.TSocket; public class ColumnFamilyRecordReader extends RecordReader> implements org.apache.hadoop.mapred.RecordReader> { + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class); + public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; private ColumnFamilySplit split; @@ -179,6 +180,7 @@ public class ColumnFamilyRecordReader extends RecordReader>> { protected List rows; - protected KeySlice lastRow; protected int totalRead = 0; - protected int i = 0; protected final AbstractType comparator; protected final AbstractType subComparator; protected final IPartitioner partitioner; @@ -299,7 +299,7 @@ public class ColumnFamilyRecordReader extends RecordReader= rows.size()) - rows = null; - - if (rows != null) + if (rows != null && i < rows.size()) return; String startToken; - if (lastRow == null) + if (totalRead == 0) { + // first request startToken = split.getStartToken(); } else { - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key)); + startToken = partitioner.getTokenFactory().toString(partitioner.getToken(Iterables.getLast(rows).key)); if (startToken.equals(split.getEndToken())) { // reached end of the split @@ -362,9 +362,6 @@ public class ColumnFamilyRecordReader extends RecordReader wideColumns; + private PeekingIterator>> wideColumns; private void maybeInit() { if (wideColumns != null && wideColumns.hasNext()) return; - // check if we need another batch - if (rows != null && ++i >= rows.size()) - rows = null; - - if (rows != null) - { - wideColumns = rows.get(i).columns.iterator(); - return; - } - - String startToken; + KeyRange keyRange; ByteBuffer startColumn; - if (lastRow == null) + if (totalRead == 0) { - startToken = split.getStartToken(); + String startToken = split.getStartToken(); + keyRange = new KeyRange(batchSize) + .setStart_token(startToken) + .setEnd_token(split.getEndToken()) + .setRow_filter(filter); startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; } else { - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.key)); + KeySlice lastRow = Iterables.getLast(rows); + logger.debug("Starting with last-seen row {}", lastRow.key); + keyRange = new KeyRange(batchSize) + .setStart_key(lastRow.key) + .setEnd_token(split.getEndToken()) + .setRow_filter(filter); startColumn = Iterables.getLast(lastRow.columns).column.name; } - KeyRange keyRange = new KeyRange(batchSize) - .setStart_token(startToken) - .setEnd_token(split.getEndToken()) - .setRow_filter(filter); try { rows = client.get_paged_slice(cfName, keyRange, startColumn, consistencyLevel); - - // nothing found? - if (rows == null || rows.isEmpty() || rows.get(0).columns.isEmpty()) - { - rows = null; - return; - } - - // nothing new? reached the end - if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || rows.get(0).columns.get(0).column.name.equals(startColumn))) - { + int n = 0; + for (KeySlice row : rows) + n += row.columns.size(); + logger.debug("read {} columns in {} rows for {} starting with {}", + new Object[]{ n, rows.size(), keyRange, startColumn }); + + wideColumns = Iterators.peekingIterator(new WideColumnIterator(rows)); + if (wideColumns.hasNext() && wideColumns.peek().right.keySet().iterator().next().equals(startColumn)) + wideColumns.next(); + if (!wideColumns.hasNext()) rows = null; - return; - } - - // prepare for the next slice to be read - lastRow = Iterables.getLast(rows); - - // reset to iterate through this new batch - i = 0; - wideColumns = rows.get(i).columns.iterator(); } catch (Exception e) { @@ -487,9 +469,47 @@ public class ColumnFamilyRecordReader extends RecordReader map = ImmutableSortedMap.of(cosc.column.name, unthriftify(cosc)); - return Pair.>create(rows.get(i).key, map); + return wideColumns.next(); + } + + private class WideColumnIterator extends AbstractIterator>> + { + private final Iterator rows; + private Iterator columns; + public KeySlice currentRow; + + public WideColumnIterator(List rows) + { + this.rows = rows.iterator(); + if (this.rows.hasNext()) + nextRow(); + else + columns = Iterators.emptyIterator(); + } + + private void nextRow() + { + currentRow = rows.next(); + columns = currentRow.columns.iterator(); + } + + protected Pair> computeNext() + { + while (true) + { + if (columns.hasNext()) + { + ColumnOrSuperColumn cosc = columns.next(); + ImmutableSortedMap map = ImmutableSortedMap.of(cosc.column.name, unthriftifySimple(cosc.column)); + return Pair.>create(currentRow.key, map); + } + + if (!rows.hasNext()) + return endOfData(); + + nextRow(); + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 7aceb0e..7cb77d7 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -733,6 +733,7 @@ public class CassandraServer implements Cassandra.Iface AbstractBounds bounds; if (range.start_key == null) { + // (token, key) is unsupported, assume (token, token) Token.TokenFactory tokenFactory = p.getTokenFactory(); Token left = tokenFactory.fromString(range.start_token); Token right = tokenFactory.fromString(range.end_token); @@ -740,7 +741,9 @@ public class CassandraServer implements Cassandra.Iface } else { - bounds = new Bounds(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p)); + RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound(p) + : RowPosition.forKey(range.end_key, p); + bounds = new Bounds(RowPosition.forKey(range.start_key, p), end); } List rows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/97aa922a/src/java/org/apache/cassandra/thrift/ThriftValidation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java index 25c751c..a77bfb6 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java +++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java @@ -479,20 +479,17 @@ public class ThriftValidation public static void validateKeyRange(CFMetaData metadata, ByteBuffer superColumn, KeyRange range) throws InvalidRequestException { - if ((range.start_key == null) != (range.end_key == null)) - { - throw new InvalidRequestException("start key and end key must either both be non-null, or both be null"); - } - if ((range.start_token == null) != (range.end_token == null)) - { - throw new InvalidRequestException("start token and end token must either both be non-null, or both be null"); - } - if ((range.start_key == null) == (range.start_token == null)) + if ((range.start_key == null) == (range.start_token == null) + || (range.end_key == null) == (range.end_token == null)) { throw new InvalidRequestException("exactly one of {start key, end key} or {start token, end token} must be specified"); } - if (range.start_key != null) + // (key, token) is supported (for wide-row CFRR) but not (token, key) + if (range.start_token != null && range.end_key != null) + throw new InvalidRequestException("start token + end key is not a supported key range"); + + if (range.start_key != null && range.end_key != null) { IPartitioner p = StorageService.getPartitioner(); Token startToken = p.getToken(range.start_key); @@ -510,7 +507,7 @@ public class ThriftValidation if (!isEmpty(range.row_filter) && superColumn != null) { - throw new InvalidRequestException("super columns are not yet supported for indexing"); + throw new InvalidRequestException("super columns are not supported for indexing"); } if (range.count <= 0)