cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [6/7] add describe_splits_ex providing improved split size estimate patch by Piotr Kolaczkowski; reviewed by jbellis for CASSANDRA-4803
Date Mon, 22 Oct 2012 18:55:18 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java
new file mode 100644
index 0000000..2519f9f
--- /dev/null
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java
@@ -0,0 +1,549 @@
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.cassandra.thrift;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents input splits used by hadoop ColumnFamilyRecordReaders
+ */
+public class CfSplit implements org.apache.thrift.TBase<CfSplit, CfSplit._Fields>,
java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CfSplit");
+
+  private static final org.apache.thrift.protocol.TField START_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("start_token",
org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField END_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("end_token",
org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField ROW_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("row_count",
org.apache.thrift.protocol.TType.I64, (short)3);
+
+  public String start_token; // required
+  public String end_token; // required
+  public long row_count; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    START_TOKEN((short)1, "start_token"),
+    END_TOKEN((short)2, "end_token"),
+    ROW_COUNT((short)3, "row_count");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // START_TOKEN
+          return START_TOKEN;
+        case 2: // END_TOKEN
+          return END_TOKEN;
+        case 3: // ROW_COUNT
+          return ROW_COUNT;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't
exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __ROW_COUNT_ISSET_ID = 0;
+  private BitSet __isset_bit_vector = new BitSet(1);
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.START_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("start_token",
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.END_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("end_token",
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.ROW_COUNT, new org.apache.thrift.meta_data.FieldMetaData("row_count",
org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfSplit.class, metaDataMap);
+  }
+
+  public CfSplit() {
+  }
+
+  public CfSplit(
+    String start_token,
+    String end_token,
+    long row_count)
+  {
+    this();
+    this.start_token = start_token;
+    this.end_token = end_token;
+    this.row_count = row_count;
+    setRow_countIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public CfSplit(CfSplit other) {
+    __isset_bit_vector.clear();
+    __isset_bit_vector.or(other.__isset_bit_vector);
+    if (other.isSetStart_token()) {
+      this.start_token = other.start_token;
+    }
+    if (other.isSetEnd_token()) {
+      this.end_token = other.end_token;
+    }
+    this.row_count = other.row_count;
+  }
+
+  public CfSplit deepCopy() {
+    return new CfSplit(this);
+  }
+
+  @Override
+  public void clear() {
+    this.start_token = null;
+    this.end_token = null;
+    setRow_countIsSet(false);
+    this.row_count = 0;
+  }
+
+  public String getStart_token() {
+    return this.start_token;
+  }
+
+  public CfSplit setStart_token(String start_token) {
+    this.start_token = start_token;
+    return this;
+  }
+
+  public void unsetStart_token() {
+    this.start_token = null;
+  }
+
+  /** Returns true if field start_token is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetStart_token() {
+    return this.start_token != null;
+  }
+
+  public void setStart_tokenIsSet(boolean value) {
+    if (!value) {
+      this.start_token = null;
+    }
+  }
+
+  public String getEnd_token() {
+    return this.end_token;
+  }
+
+  public CfSplit setEnd_token(String end_token) {
+    this.end_token = end_token;
+    return this;
+  }
+
+  public void unsetEnd_token() {
+    this.end_token = null;
+  }
+
+  /** Returns true if field end_token is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetEnd_token() {
+    return this.end_token != null;
+  }
+
+  public void setEnd_tokenIsSet(boolean value) {
+    if (!value) {
+      this.end_token = null;
+    }
+  }
+
+  public long getRow_count() {
+    return this.row_count;
+  }
+
+  public CfSplit setRow_count(long row_count) {
+    this.row_count = row_count;
+    setRow_countIsSet(true);
+    return this;
+  }
+
+  public void unsetRow_count() {
+    __isset_bit_vector.clear(__ROW_COUNT_ISSET_ID);
+  }
+
+  /** Returns true if field row_count is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetRow_count() {
+    return __isset_bit_vector.get(__ROW_COUNT_ISSET_ID);
+  }
+
+  public void setRow_countIsSet(boolean value) {
+    __isset_bit_vector.set(__ROW_COUNT_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case START_TOKEN:
+      if (value == null) {
+        unsetStart_token();
+      } else {
+        setStart_token((String)value);
+      }
+      break;
+
+    case END_TOKEN:
+      if (value == null) {
+        unsetEnd_token();
+      } else {
+        setEnd_token((String)value);
+      }
+      break;
+
+    case ROW_COUNT:
+      if (value == null) {
+        unsetRow_count();
+      } else {
+        setRow_count((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case START_TOKEN:
+      return getStart_token();
+
+    case END_TOKEN:
+      return getEnd_token();
+
+    case ROW_COUNT:
+      return Long.valueOf(getRow_count());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and
false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case START_TOKEN:
+      return isSetStart_token();
+    case END_TOKEN:
+      return isSetEnd_token();
+    case ROW_COUNT:
+      return isSetRow_count();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof CfSplit)
+      return this.equals((CfSplit)that);
+    return false;
+  }
+
+  public boolean equals(CfSplit that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_start_token = true && this.isSetStart_token();
+    boolean that_present_start_token = true && that.isSetStart_token();
+    if (this_present_start_token || that_present_start_token) {
+      if (!(this_present_start_token && that_present_start_token))
+        return false;
+      if (!this.start_token.equals(that.start_token))
+        return false;
+    }
+
+    boolean this_present_end_token = true && this.isSetEnd_token();
+    boolean that_present_end_token = true && that.isSetEnd_token();
+    if (this_present_end_token || that_present_end_token) {
+      if (!(this_present_end_token && that_present_end_token))
+        return false;
+      if (!this.end_token.equals(that.end_token))
+        return false;
+    }
+
+    boolean this_present_row_count = true;
+    boolean that_present_row_count = true;
+    if (this_present_row_count || that_present_row_count) {
+      if (!(this_present_row_count && that_present_row_count))
+        return false;
+      if (this.row_count != that.row_count)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+
+    boolean present_start_token = true && (isSetStart_token());
+    builder.append(present_start_token);
+    if (present_start_token)
+      builder.append(start_token);
+
+    boolean present_end_token = true && (isSetEnd_token());
+    builder.append(present_end_token);
+    if (present_end_token)
+      builder.append(end_token);
+
+    boolean present_row_count = true;
+    builder.append(present_row_count);
+    if (present_row_count)
+      builder.append(row_count);
+
+    return builder.toHashCode();
+  }
+
+  public int compareTo(CfSplit other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    CfSplit typedOther = (CfSplit)other;
+
+    lastComparison = Boolean.valueOf(isSetStart_token()).compareTo(typedOther.isSetStart_token());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetStart_token()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.start_token, typedOther.start_token);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetEnd_token()).compareTo(typedOther.isSetEnd_token());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetEnd_token()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.end_token, typedOther.end_token);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetRow_count()).compareTo(typedOther.isSetRow_count());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRow_count()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.row_count, typedOther.row_count);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException
{
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
+        break;
+      }
+      switch (field.id) {
+        case 1: // START_TOKEN
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.start_token = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // END_TOKEN
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.end_token = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3: // ROW_COUNT
+          if (field.type == org.apache.thrift.protocol.TType.I64) {
+            this.row_count = iprot.readI64();
+            setRow_countIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+    // check for required fields of primitive type, which can't be checked in the validate
method
+    if (!isSetRow_count()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'row_count'
was not found in serialized data! Struct: " + toString());
+    }
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
{
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.start_token != null) {
+      oprot.writeFieldBegin(START_TOKEN_FIELD_DESC);
+      oprot.writeString(this.start_token);
+      oprot.writeFieldEnd();
+    }
+    if (this.end_token != null) {
+      oprot.writeFieldBegin(END_TOKEN_FIELD_DESC);
+      oprot.writeString(this.end_token);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldBegin(ROW_COUNT_FIELD_DESC);
+    oprot.writeI64(this.row_count);
+    oprot.writeFieldEnd();
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("CfSplit(");
+    boolean first = true;
+
+    sb.append("start_token:");
+    if (this.start_token == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.start_token);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("end_token:");
+    if (this.end_token == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.end_token);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("row_count:");
+    sb.append(this.row_count);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (start_token == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'start_token'
was not present! Struct: " + toString());
+    }
+    if (end_token == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'end_token'
was not present! Struct: " + toString());
+    }
+    // alas, we cannot check 'row_count' because it's a primitive and you chose the non-beans
generator.
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+      __isset_bit_vector = new BitSet(1);
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 7e183c7..9d0701f 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "19.32.0";
+  public static final String VERSION = "19.33.0";
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index cb79b01..c4c6570 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -35,6 +35,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.dht.IPartitioner;
@@ -44,18 +47,11 @@ import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.TokenRange;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -208,7 +204,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer,
SortedMap<B
         public List<InputSplit> call() throws Exception
         {
             ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-            List<String> tokens = getSubSplits(keyspace, cfName, range, conf);
+            List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
             assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints
size must match endpoints size";
             // turn the sub-ranges into InputSplits
             String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
@@ -223,15 +219,21 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer,
SortedMap<B
             }
 
             Token.TokenFactory factory = partitioner.getTokenFactory();
-            for (int i = 1; i < tokens.size(); i++)
+            for (CfSplit subSplit : subSplits)
             {
-                Token left = factory.fromString(tokens.get(i - 1));
-                Token right = factory.fromString(tokens.get(i));
+                Token left = factory.fromString(subSplit.getStart_token());
+                Token right = factory.fromString(subSplit.getEnd_token());
                 Range<Token> range = new Range<Token>(left, right, partitioner);
                 List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap()
: ImmutableList.of(range);
                 for (Range<Token> subrange : ranges)
                 {
-                    ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left),
factory.toString(subrange.right), endpoints);
+                    ColumnFamilySplit split =
+                            new ColumnFamilySplit(
+                                    factory.toString(subrange.left),
+                                    factory.toString(subrange.right),
+                                    subSplit.getRow_count(),
+                                    endpoints);
+
                     logger.debug("adding " + split);
                     splits.add(split);
                 }
@@ -240,7 +242,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer,
SortedMap<B
         }
     }
 
-    private List<String> getSubSplits(String keyspace, String cfName, TokenRange range,
Configuration conf) throws IOException
+    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range,
Configuration conf) throws IOException
     {
         int splitsize = ConfigHelper.getInputSplitSize(conf);
         for (int i = 0; i < range.rpc_endpoints.size(); i++)
@@ -254,7 +256,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer,
SortedMap<B
             {
                 Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
                 client.set_keyspace(keyspace);
-                return client.describe_splits(cfName, range.start_token, range.end_token,
splitsize);
+                return client.describe_splits_ex(cfName, range.start_token, range.end_token,
splitsize);
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 73f9786..c662932 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -145,12 +145,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer,
SortedMap
         predicate = ConfigHelper.getInputSlicePredicate(conf);
         boolean widerows = ConfigHelper.getInputIsWide(conf);
         isEmptyPredicate = isEmptyPredicate(predicate);
-        totalRowCount = ConfigHelper.getInputSplitSize(conf);
+        totalRowCount = (int) this.split.getLength();
         batchSize = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
         consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
 
-
         keyspace = ConfigHelper.getInputKeyspace(conf);
 
         try
@@ -189,7 +188,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer,
SortedMap
     public boolean nextKeyValue() throws IOException
     {
         if (!iter.hasNext())
+        {
+            logger.debug("Finished scanning " + iter.rowsRead() + " rows (estimate was: "
+ totalRowCount + ")");
             return false;
+        }
+
         currentRow = iter.next();
         return true;
     }
@@ -482,7 +485,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer,
SortedMap
             Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next = wideColumns.next();
             lastColumn = next.right.values().iterator().next().name();
 
-            maybeCountRow(next);
+            maybeIncreaseRowCounter(next);
             return next;
         }
 
@@ -491,7 +494,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer,
SortedMap
          * Increases the row counter only if we really moved to the next row.
          * @param next just fetched row slice
          */
-        private void maybeCountRow(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
next)
+        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer,
IColumn>> next)
         {
             ByteBuffer currentKey = next.left;
             if (!currentKey.equals(lastCountedKey))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
index bd2e487..4085c68 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
@@ -33,14 +33,22 @@ public class ColumnFamilySplit extends InputSplit implements Writable,
org.apach
 {
     private String startToken;
     private String endToken;
+    private long length;
     private String[] dataNodes;
 
+    @Deprecated
     public ColumnFamilySplit(String startToken, String endToken, String[] dataNodes)
     {
+        this(startToken, endToken, Long.MAX_VALUE, dataNodes);
+    }
+
+    public ColumnFamilySplit(String startToken, String endToken, long length, String[] dataNodes)
+    {
         assert startToken != null;
         assert endToken != null;
         this.startToken = startToken;
         this.endToken = endToken;
+        this.length = length;
         this.dataNodes = dataNodes;
     }
 
@@ -58,8 +66,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
 
     public long getLength()
     {
-        // only used for sorting splits. we don't have the capability, yet.
-        return Long.MAX_VALUE;
+        return length;
     }
 
     public String[] getLocations()
@@ -76,7 +83,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
     {
         out.writeUTF(startToken);
         out.writeUTF(endToken);
-
+        out.writeLong(length);
         out.writeInt(dataNodes.length);
         for (String endpoint : dataNodes)
         {
@@ -88,6 +95,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach
     {
         startToken = in.readUTF();
         endToken = in.readUTF();
+        length = in.readLong();
 
         int numOfEndpoints = in.readInt();
         dataNodes = new String[numOfEndpoints];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index b1eaa1e..80c3f46 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -36,6 +36,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.*;
 
 import org.apache.cassandra.metrics.ClientRequestMetrics;
+
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -2184,28 +2185,50 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
     }
 
     /**
-     * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible
for into pieces of roughly keysPerSplit
+     * @return list of Token ranges (_not_ keys!) together with estimated key count,
+     *      breaking up the data this node is responsible for into pieces of roughly keysPerSplit
      */
-    public List<Token> getSplits(String table, String cfName, Range<Token> range,
int keysPerSplit)
+    public List<Pair<Range<Token>, Long>> getSplits(String table, String
cfName, Range<Token> range, int keysPerSplit)
     {
-        List<Token> tokens = new ArrayList<Token>();
-        // we use the actual Range token for the first and last brackets of the splits to
ensure correctness
-        tokens.add(range.left);
-
         Table t = Table.open(table);
         ColumnFamilyStore cfs = t.getColumnFamilyStore(cfName);
         List<DecoratedKey> keys = keySamples(Collections.singleton(cfs), range);
-        int splits = keys.size() * DatabaseDescriptor.getIndexInterval() / keysPerSplit;
 
-        if (keys.size() >= splits)
+        final long totalRowCountEstimate = (keys.size() + 1) * DatabaseDescriptor.getIndexInterval();
+
+        // splitCount should be much smaller than number of key samples, to avoid huge sampling
error
+        final int minSamplesPerSplit = 4;
+        final int maxSplitCount = keys.size() / minSamplesPerSplit + 1;
+        final int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate
/ keysPerSplit)));
+
+        List<Token> tokens = keysToTokens(range, keys);
+        return getSplits(tokens, splitCount);
+    }
+
+    private List<Pair<Range<Token>, Long>> getSplits(List<Token>
tokens, int splitCount)
+    {
+        final double step = (double) (tokens.size() - 1) / splitCount;
+        int prevIndex = 0;
+        Token prevToken = tokens.get(0);
+        List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount);
+        for (int i = 1; i <= splitCount; i++)
         {
-            for (int i = 1; i < splits; i++)
-            {
-                int index = i * (keys.size() / splits);
-                tokens.add(keys.get(index).token);
-            }
+            int index = (int) Math.round(i * step);
+            Token token = tokens.get(index);
+            long rowCountEstimate = (index - prevIndex) * DatabaseDescriptor.getIndexInterval();
+            splits.add(Pair.create(new Range<Token>(prevToken, token), rowCountEstimate));
+            prevIndex = index;
+            prevToken = token;
         }
+        return splits;
+    }
 
+    private List<Token> keysToTokens(Range<Token> range, List<DecoratedKey>
keys)
+    {
+        List<Token> tokens = Lists.newArrayListWithExpectedSize(keys.size() + 2);
+        tokens.add(range.left);
+        for (DecoratedKey key : keys)
+            tokens.add(key.token);
         tokens.add(range.right);
         return tokens;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index ad416f3..3bf155e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,6 +30,8 @@ import java.util.zip.Inflater;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.hadoop.ColumnFamilySplit;
+import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -882,18 +884,33 @@ public class CassandraServer implements Cassandra.Iface
         return DatabaseDescriptor.getEndpointSnitch().getClass().getName();
     }
 
+    @Deprecated
     public List<String> describe_splits(String cfName, String start_token, String end_token,
int keys_per_split)
     throws TException, InvalidRequestException
     {
+        List<CfSplit> splits = describe_splits_ex(cfName, start_token, end_token, keys_per_split);
+        List<String> result = new ArrayList<String>(splits.size() + 1);
+
+        result.add(splits.get(0).getStart_token());
+        for (CfSplit cfSplit : splits)
+            result.add(cfSplit.getEnd_token());
+
+        return result;
+    }
+
+    @Override
+    public List<CfSplit> describe_splits_ex(String cfName, String start_token, String
end_token, int keys_per_split)
+    throws InvalidRequestException, TException
+    {
         // TODO: add keyspace authorization call post CASSANDRA-1425
         Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
-        List<Token> tokens = StorageService.instance.getSplits(state().getKeyspace(),
cfName, new Range<Token>(tf.fromString(start_token), tf.fromString(end_token)), keys_per_split);
-        List<String> splits = new ArrayList<String>(tokens.size());
-        for (Token token : tokens)
-        {
-            splits.add(tf.toString(token));
-        }
-        return splits;
+        Range<Token> tr = new Range<Token>(tf.fromString(start_token), tf.fromString(end_token));
+        List<Pair<Range<Token>, Long>> splits =
+                StorageService.instance.getSplits(state().getKeyspace(), cfName, tr, keys_per_split);
+        List<CfSplit> result = new ArrayList<CfSplit>(splits.size());
+        for (Pair<Range<Token>, Long> split : splits)
+            result.add(new CfSplit(split.left.left.toString(), split.left.right.toString(),
split.right));
+        return result;
     }
 
     public void login(AuthenticationRequest auth_request) throws AuthenticationException,
AuthorizationException, TException


Mime
View raw message