cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r909631 - in /incubator/cassandra/trunk: interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/service/ src/java/org/apache/c...
Date Fri, 12 Feb 2010 22:02:46 GMT
Author: jbellis
Date: Fri Feb 12 22:02:44 2010
New Revision: 909631

URL: http://svn.apache.org/viewvc?rev=909631&view=rev
Log:
sub splits
patch by jbellis; reviewed by Stu Hood for CASSANDRA-342

Modified:
    incubator/cassandra/trunk/interface/cassandra.thrift
    incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Fri Feb 12 22:02:44 2010
@@ -434,6 +434,7 @@
       to list of endpoints, because you can't use Thrift structs as
       map keys:
       https://issues.apache.org/jira/browse/THRIFT-162 
+
       for the same reason, we can't return a set here, even though
       order is neither important nor predictable. */
   list<TokenRange> describe_ring(1:required string keyspace),
@@ -441,4 +442,13 @@
   /** describe specified keyspace */
   map<string, map<string, string>> describe_keyspace(1:required string keyspace)
                                    throws (1:NotFoundException nfe),
+
+  /** experimental API for hadoop/parallel query support.  
+      may change violently and without warning. 
+
+      returns list of token strings such that first subrange is (list[0], list[1]],
+      next is (list[1], list[2]], etc. */
+  list<string> describe_splits(1:required string start_token, 
+  	                       2:required string end_token,
+                               3:required i32 keys_per_split),
 }

Modified: incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(original)
+++ incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
Fri Feb 12 22:02:44 2010
@@ -187,6 +187,7 @@
      * to list of endpoints, because you can't use Thrift structs as
      * map keys:
      * https://issues.apache.org/jira/browse/THRIFT-162
+     * 
      * for the same reason, we can't return a set here, even though
      * order is neither important nor predictable.
      * 
@@ -201,6 +202,19 @@
      */
     public Map<String,Map<String,String>> describe_keyspace(String keyspace)
throws NotFoundException, TException;
 
+    /**
+     * experimental API for hadoop/parallel query support.
+     * may change violently and without warning.
+     * 
+     * returns list of token strings such that first subrange is (list[0], list[1]],
+     * next is (list[1], list[2]], etc.
+     * 
+     * @param start_token
+     * @param end_token
+     * @param keys_per_split
+     */
+    public List<String> describe_splits(String start_token, String end_token, int keys_per_split)
throws TException;
+
   }
 
   public static class Client implements Iface {
@@ -992,6 +1006,41 @@
       throw new TApplicationException(TApplicationException.MISSING_RESULT, "describe_keyspace
failed: unknown result");
     }
 
+    public List<String> describe_splits(String start_token, String end_token, int keys_per_split)
throws TException
+    {
+      send_describe_splits(start_token, end_token, keys_per_split);
+      return recv_describe_splits();
+    }
+
+    public void send_describe_splits(String start_token, String end_token, int keys_per_split)
throws TException
+    {
+      oprot_.writeMessageBegin(new TMessage("describe_splits", TMessageType.CALL, seqid_));
+      describe_splits_args args = new describe_splits_args();
+      args.start_token = start_token;
+      args.end_token = end_token;
+      args.keys_per_split = keys_per_split;
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public List<String> recv_describe_splits() throws TException
+    {
+      TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == TMessageType.EXCEPTION) {
+        TApplicationException x = TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      describe_splits_result result = new describe_splits_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new TApplicationException(TApplicationException.MISSING_RESULT, "describe_splits
failed: unknown result");
+    }
+
   }
   public static class Processor implements TProcessor {
     private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
@@ -1017,6 +1066,7 @@
       processMap_.put("describe_version", new describe_version());
       processMap_.put("describe_ring", new describe_ring());
       processMap_.put("describe_keyspace", new describe_keyspace());
+      processMap_.put("describe_splits", new describe_splits());
     }
 
     protected static interface ProcessFunction {
@@ -1553,6 +1603,22 @@
 
     }
 
+    private class describe_splits implements ProcessFunction {
+      public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
+      {
+        describe_splits_args args = new describe_splits_args();
+        args.read(iprot);
+        iprot.readMessageEnd();
+        describe_splits_result result = new describe_splits_result();
+        result.success = iface_.describe_splits(args.start_token, args.end_token, args.keys_per_split);
+        oprot.writeMessageBegin(new TMessage("describe_splits", TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
   }
 
   public static class login_args implements TBase<login_args._Fields>, java.io.Serializable,
Cloneable   {
@@ -19052,4 +19118,781 @@
 
   }
 
+  public static class describe_splits_args implements TBase<describe_splits_args._Fields>,
java.io.Serializable, Cloneable, Comparable<describe_splits_args>   {
+    private static final TStruct STRUCT_DESC = new TStruct("describe_splits_args");
+
+    private static final TField START_TOKEN_FIELD_DESC = new TField("start_token", TType.STRING,
(short)1);
+    private static final TField END_TOKEN_FIELD_DESC = new TField("end_token", TType.STRING,
(short)2);
+    private static final TField KEYS_PER_SPLIT_FIELD_DESC = new TField("keys_per_split",
TType.I32, (short)3);
+
+    public String start_token;
+    public String end_token;
+    public int keys_per_split;
+
+    /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+      START_TOKEN((short)1, "start_token"),
+      END_TOKEN((short)2, "end_token"),
+      KEYS_PER_SPLIT((short)3, "keys_per_split");
+
+      private static final Map<Integer, _Fields> byId = new HashMap<Integer, _Fields>();
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byId.put((int)field._thriftId, field);
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        return byId.get(fieldId);
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't
exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    private static final int __KEYS_PER_SPLIT_ISSET_ID = 0;
+    private BitSet __isset_bit_vector = new BitSet(1);
+
+    public static final Map<_Fields, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new
EnumMap<_Fields, FieldMetaData>(_Fields.class) {{
+      put(_Fields.START_TOKEN, new FieldMetaData("start_token", TFieldRequirementType.REQUIRED,

+          new FieldValueMetaData(TType.STRING)));
+      put(_Fields.END_TOKEN, new FieldMetaData("end_token", TFieldRequirementType.REQUIRED,

+          new FieldValueMetaData(TType.STRING)));
+      put(_Fields.KEYS_PER_SPLIT, new FieldMetaData("keys_per_split", TFieldRequirementType.REQUIRED,

+          new FieldValueMetaData(TType.I32)));
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(describe_splits_args.class, metaDataMap);
+    }
+
+    public describe_splits_args() {
+    }
+
+    public describe_splits_args(
+      String start_token,
+      String end_token,
+      int keys_per_split)
+    {
+      this();
+      this.start_token = start_token;
+      this.end_token = end_token;
+      this.keys_per_split = keys_per_split;
+      setKeys_per_splitIsSet(true);
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public describe_splits_args(describe_splits_args other) {
+      __isset_bit_vector.clear();
+      __isset_bit_vector.or(other.__isset_bit_vector);
+      if (other.isSetStart_token()) {
+        this.start_token = other.start_token;
+      }
+      if (other.isSetEnd_token()) {
+        this.end_token = other.end_token;
+      }
+      this.keys_per_split = other.keys_per_split;
+    }
+
+    public describe_splits_args deepCopy() {
+      return new describe_splits_args(this);
+    }
+
+    @Deprecated
+    public describe_splits_args clone() {
+      return new describe_splits_args(this);
+    }
+
+    public String getStart_token() {
+      return this.start_token;
+    }
+
+    public describe_splits_args setStart_token(String start_token) {
+      this.start_token = start_token;
+      return this;
+    }
+
+    public void unsetStart_token() {
+      this.start_token = null;
+    }
+
+    /** Returns true if field start_token is set (has been asigned a value) and false otherwise
*/
+    public boolean isSetStart_token() {
+      return this.start_token != null;
+    }
+
+    public void setStart_tokenIsSet(boolean value) {
+      if (!value) {
+        this.start_token = null;
+      }
+    }
+
+    public String getEnd_token() {
+      return this.end_token;
+    }
+
+    public describe_splits_args setEnd_token(String end_token) {
+      this.end_token = end_token;
+      return this;
+    }
+
+    public void unsetEnd_token() {
+      this.end_token = null;
+    }
+
+    /** Returns true if field end_token is set (has been asigned a value) and false otherwise
*/
+    public boolean isSetEnd_token() {
+      return this.end_token != null;
+    }
+
+    public void setEnd_tokenIsSet(boolean value) {
+      if (!value) {
+        this.end_token = null;
+      }
+    }
+
+    public int getKeys_per_split() {
+      return this.keys_per_split;
+    }
+
+    public describe_splits_args setKeys_per_split(int keys_per_split) {
+      this.keys_per_split = keys_per_split;
+      setKeys_per_splitIsSet(true);
+      return this;
+    }
+
+    public void unsetKeys_per_split() {
+      __isset_bit_vector.clear(__KEYS_PER_SPLIT_ISSET_ID);
+    }
+
+    /** Returns true if field keys_per_split is set (has been asigned a value) and false
otherwise */
+    public boolean isSetKeys_per_split() {
+      return __isset_bit_vector.get(__KEYS_PER_SPLIT_ISSET_ID);
+    }
+
+    public void setKeys_per_splitIsSet(boolean value) {
+      __isset_bit_vector.set(__KEYS_PER_SPLIT_ISSET_ID, value);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case START_TOKEN:
+        if (value == null) {
+          unsetStart_token();
+        } else {
+          setStart_token((String)value);
+        }
+        break;
+
+      case END_TOKEN:
+        if (value == null) {
+          unsetEnd_token();
+        } else {
+          setEnd_token((String)value);
+        }
+        break;
+
+      case KEYS_PER_SPLIT:
+        if (value == null) {
+          unsetKeys_per_split();
+        } else {
+          setKeys_per_split((Integer)value);
+        }
+        break;
+
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case START_TOKEN:
+        return getStart_token();
+
+      case END_TOKEN:
+        return getEnd_token();
+
+      case KEYS_PER_SPLIT:
+        return new Integer(getKeys_per_split());
+
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been asigned a value)
and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      case START_TOKEN:
+        return isSetStart_token();
+      case END_TOKEN:
+        return isSetEnd_token();
+      case KEYS_PER_SPLIT:
+        return isSetKeys_per_split();
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof describe_splits_args)
+        return this.equals((describe_splits_args)that);
+      return false;
+    }
+
+    public boolean equals(describe_splits_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_start_token = true && this.isSetStart_token();
+      boolean that_present_start_token = true && that.isSetStart_token();
+      if (this_present_start_token || that_present_start_token) {
+        if (!(this_present_start_token && that_present_start_token))
+          return false;
+        if (!this.start_token.equals(that.start_token))
+          return false;
+      }
+
+      boolean this_present_end_token = true && this.isSetEnd_token();
+      boolean that_present_end_token = true && that.isSetEnd_token();
+      if (this_present_end_token || that_present_end_token) {
+        if (!(this_present_end_token && that_present_end_token))
+          return false;
+        if (!this.end_token.equals(that.end_token))
+          return false;
+      }
+
+      boolean this_present_keys_per_split = true;
+      boolean that_present_keys_per_split = true;
+      if (this_present_keys_per_split || that_present_keys_per_split) {
+        if (!(this_present_keys_per_split && that_present_keys_per_split))
+          return false;
+        if (this.keys_per_split != that.keys_per_split)
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(describe_splits_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      describe_splits_args typedOther = (describe_splits_args)other;
+
+      lastComparison = Boolean.valueOf(isSetStart_token()).compareTo(isSetStart_token());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      lastComparison = TBaseHelper.compareTo(start_token, typedOther.start_token);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      lastComparison = Boolean.valueOf(isSetEnd_token()).compareTo(isSetEnd_token());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      lastComparison = TBaseHelper.compareTo(end_token, typedOther.end_token);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      lastComparison = Boolean.valueOf(isSetKeys_per_split()).compareTo(isSetKeys_per_split());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      lastComparison = TBaseHelper.compareTo(keys_per_split, typedOther.keys_per_split);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        _Fields fieldId = _Fields.findByThriftId(field.id);
+        if (fieldId == null) {
+          TProtocolUtil.skip(iprot, field.type);
+        } else {
+          switch (fieldId) {
+            case START_TOKEN:
+              if (field.type == TType.STRING) {
+                this.start_token = iprot.readString();
+              } else { 
+                TProtocolUtil.skip(iprot, field.type);
+              }
+              break;
+            case END_TOKEN:
+              if (field.type == TType.STRING) {
+                this.end_token = iprot.readString();
+              } else { 
+                TProtocolUtil.skip(iprot, field.type);
+              }
+              break;
+            case KEYS_PER_SPLIT:
+              if (field.type == TType.I32) {
+                this.keys_per_split = iprot.readI32();
+                setKeys_per_splitIsSet(true);
+              } else { 
+                TProtocolUtil.skip(iprot, field.type);
+              }
+              break;
+          }
+          iprot.readFieldEnd();
+        }
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate
method
+      if (!isSetKeys_per_split()) {
+        throw new TProtocolException("Required field 'keys_per_split' was not found in serialized
data! Struct: " + toString());
+      }
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (this.start_token != null) {
+        oprot.writeFieldBegin(START_TOKEN_FIELD_DESC);
+        oprot.writeString(this.start_token);
+        oprot.writeFieldEnd();
+      }
+      if (this.end_token != null) {
+        oprot.writeFieldBegin(END_TOKEN_FIELD_DESC);
+        oprot.writeString(this.end_token);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(KEYS_PER_SPLIT_FIELD_DESC);
+      oprot.writeI32(this.keys_per_split);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("describe_splits_args(");
+      boolean first = true;
+
+      sb.append("start_token:");
+      if (this.start_token == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.start_token);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("end_token:");
+      if (this.end_token == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.end_token);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("keys_per_split:");
+      sb.append(this.keys_per_split);
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+      if (start_token == null) {
+        throw new TProtocolException("Required field 'start_token' was not present! Struct:
" + toString());
+      }
+      if (end_token == null) {
+        throw new TProtocolException("Required field 'end_token' was not present! Struct:
" + toString());
+      }
+      // alas, we cannot check 'keys_per_split' because it's a primitive and you chose the
non-beans generator.
+    }
+
+  }
+
+  public static class describe_splits_result implements TBase<describe_splits_result._Fields>,
java.io.Serializable, Cloneable, Comparable<describe_splits_result>   {
+    private static final TStruct STRUCT_DESC = new TStruct("describe_splits_result");
+
+    private static final TField SUCCESS_FIELD_DESC = new TField("success", TType.LIST, (short)0);
+
+    public List<String> success;
+
+    /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
+    public enum _Fields implements TFieldIdEnum {
+      SUCCESS((short)0, "success");
+
+      private static final Map<Integer, _Fields> byId = new HashMap<Integer, _Fields>();
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byId.put((int)field._thriftId, field);
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        return byId.get(fieldId);
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't
exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+
+    public static final Map<_Fields, FieldMetaData> metaDataMap = Collections.unmodifiableMap(new
EnumMap<_Fields, FieldMetaData>(_Fields.class) {{
+      put(_Fields.SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT, 
+          new ListMetaData(TType.LIST, 
+              new FieldValueMetaData(TType.STRING))));
+    }});
+
+    static {
+      FieldMetaData.addStructMetaDataMap(describe_splits_result.class, metaDataMap);
+    }
+
+    public describe_splits_result() {
+    }
+
+    public describe_splits_result(
+      List<String> success)
+    {
+      this();
+      this.success = success;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public describe_splits_result(describe_splits_result other) {
+      if (other.isSetSuccess()) {
+        List<String> __this__success = new ArrayList<String>();
+        for (String other_element : other.success) {
+          __this__success.add(other_element);
+        }
+        this.success = __this__success;
+      }
+    }
+
+    public describe_splits_result deepCopy() {
+      return new describe_splits_result(this);
+    }
+
+    @Deprecated
+    public describe_splits_result clone() {
+      return new describe_splits_result(this);
+    }
+
+    public int getSuccessSize() {
+      return (this.success == null) ? 0 : this.success.size();
+    }
+
+    public java.util.Iterator<String> getSuccessIterator() {
+      return (this.success == null) ? null : this.success.iterator();
+    }
+
+    public void addToSuccess(String elem) {
+      if (this.success == null) {
+        this.success = new ArrayList<String>();
+      }
+      this.success.add(elem);
+    }
+
+    public List<String> getSuccess() {
+      return this.success;
+    }
+
+    public describe_splits_result setSuccess(List<String> success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been asigned a value) and false otherwise
*/
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((List<String>)value);
+        }
+        break;
+
+      }
+    }
+
+    public void setFieldValue(int fieldID, Object value) {
+      setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    public Object getFieldValue(int fieldId) {
+      return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been asigned a value)
and false otherwise */
+    public boolean isSet(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    public boolean isSet(int fieldID) {
+      return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof describe_splits_result)
+        return this.equals((describe_splits_result)that);
+      return false;
+    }
+
+    public boolean equals(describe_splits_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(describe_splits_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      describe_splits_result typedOther = (describe_splits_result)other;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      lastComparison = TBaseHelper.compareTo(success, typedOther.success);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      return 0;
+    }
+
+    public void read(TProtocol iprot) throws TException {
+      TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == TType.STOP) { 
+          break;
+        }
+        _Fields fieldId = _Fields.findByThriftId(field.id);
+        if (fieldId == null) {
+          TProtocolUtil.skip(iprot, field.type);
+        } else {
+          switch (fieldId) {
+            case SUCCESS:
+              if (field.type == TType.LIST) {
+                {
+                  TList _list100 = iprot.readListBegin();
+                  this.success = new ArrayList<String>(_list100.size);
+                  for (int _i101 = 0; _i101 < _list100.size; ++_i101)
+                  {
+                    String _elem102;
+                    _elem102 = iprot.readString();
+                    this.success.add(_elem102);
+                  }
+                  iprot.readListEnd();
+                }
+              } else { 
+                TProtocolUtil.skip(iprot, field.type);
+              }
+              break;
+          }
+          iprot.readFieldEnd();
+        }
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate
method
+      validate();
+    }
+
+    public void write(TProtocol oprot) throws TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      if (this.isSetSuccess()) {
+        oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+        {
+          oprot.writeListBegin(new TList(TType.STRING, this.success.size()));
+          for (String _iter103 : this.success)
+          {
+            oprot.writeString(_iter103);
+          }
+          oprot.writeListEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("describe_splits_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws TException {
+      // check for required fields
+    }
+
+  }
+
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Feb
12 22:02:44 2010
@@ -205,12 +205,11 @@
         public void doVerb(Message message)
         {
             StorageService ss = StorageService.instance;
-            List<String> tokens = ss.getSplits(2);
-            assert tokens.size() == 3 : tokens.size();
+            String tokenString = ss.getBootstrapToken().toString();
             Message response;
             try
             {
-                response = message.getReply(FBUtilities.getLocalAddress(), tokens.get(1).getBytes("UTF-8"));
+                response = message.getReply(FBUtilities.getLocalAddress(), tokenString.getBytes("UTF-8"));
             }
             catch (UnsupportedEncodingException e)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Feb 12 22:02:44 2010
@@ -1,16 +1,22 @@
 package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedMap;
+import java.net.InetAddress;
+import java.util.*;
 
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TDeserializer;
@@ -113,16 +119,63 @@
         predicate = predicateFromString(conf.get(PREDICATE_CONFIG));
         validateConfiguration();
 
-        List<TokenRange> map = getRangeMap();
+        // cannonical ranges and nodes holding replicas
+        List<TokenRange> masterRangeNodes = getRangeMap();
+
+        // cannonical ranges, split into pieces:
+        // for each range, pick a live owner and ask it to compute bite-sized splits
+        // TODO parallelize this thread-per-range
+        Map<TokenRange, List<String>> splitRanges = new HashMap<TokenRange,
List<String>>();
+        for (TokenRange range : masterRangeNodes)
+        {
+            splitRanges.put(range, getSubSplits(range));
+        }
+
+        // turn the sub-ranges into InputSplits
         ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-        for (TokenRange entry : map)
+        for (Map.Entry<TokenRange, List<String>> entry : splitRanges.entrySet())
         {
-            if (logger.isDebugEnabled())
-                logger.debug("split range is [" + entry.start_token + ", " + entry.end_token
+ "]");
-            String[] endpoints = entry.endpoints.toArray(new String[0]);
-            splits.add(new ColumnFamilySplit(keyspace, columnFamily, predicate, entry.start_token,
entry.end_token, endpoints));
+            TokenRange range = entry.getKey();
+            List<String> tokens = entry.getValue();
+            String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+
+            int i = 1;
+            for ( ; i < tokens.size(); i++)
+            {
+                ColumnFamilySplit split = new ColumnFamilySplit(keyspace, columnFamily, predicate,
tokens.get(i - 1), tokens.get(i), endpoints);
+                logger.info("adding " + split);
+                splits.add(split);
+            }
         }
+        assert splits.size() > 0;
+        
+        return splits;
+    }
 
+    private List<String> getSubSplits(TokenRange range) throws IOException
+    {
+        // TODO handle failure of range replicas & retry
+        TSocket socket = new TSocket(range.endpoints.get(0),
+                                     DatabaseDescriptor.getThriftPort());
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        try
+        {
+            socket.open();
+        }
+        catch (TTransportException e)
+        {
+            throw new IOException(e);
+        }
+        List<String> splits;
+        try
+        {
+            splits = client.describe_splits(range.start_token, range.end_token, 128); //
TODO make split size configurable
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
         return splits;
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Fri Feb 12 22:02:44 2010
@@ -3,6 +3,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -111,7 +112,20 @@
             dataNodes[i] = in.readUTF();
         }
     }
-    
+
+    @Override
+    public String toString()
+    {
+        return "ColumnFamilySplit{" +
+               "startToken='" + startToken + '\'' +
+               ", endToken='" + endToken + '\'' +
+               ", table='" + table + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", dataNodes=" + (dataNodes == null ? null : Arrays.asList(dataNodes)) +
+               ", predicate=" + predicate +
+               '}';
+    }
+
     public static ColumnFamilySplit read(DataInput in) throws IOException
     {
         ColumnFamilySplit w = new ColumnFamilySplit();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri
Feb 12 22:02:44 2010
@@ -40,6 +40,7 @@
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -102,6 +103,7 @@
         GOSSIP_DIGEST_SYN,
         GOSSIP_DIGEST_ACK,
         GOSSIP_DIGEST_ACK2,
+        ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
     public static final Verb[] VERBS = Verb.values();
@@ -1219,20 +1221,13 @@
     }
 
     /**
-     * @param splits: number of ranges to break into. Minimum 2.
-     * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible
for into `splits` pieces.
-     * There will be 1 more token than splits requested.  So for splits of 2, tokens T1 T2
T3 will be returned,
-     * where (T1, T2] is the first range and (T2, T3] is the second.  The first token will
always be the left
-     * Token of this node's primary range, and the last will always be the Right token of
that range.
-     */
-    public List<String> getSplits(int splits)
+     * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible
for into pieces of roughly keysPerSplit
+     */ 
+    public List<Token> getSplits(Range range, int keysPerSplit)
     {
-        assert splits > 1;
+        List<Token> tokens = new ArrayList<Token>();
         // we use the actual Range token for the first and last brackets of the splits to
ensure correctness
-        // (we're only operating on 1/128 of the keys remember)
-        Range range = getLocalPrimaryRange();
-        List<String> tokens = new ArrayList<String>();
-        tokens.add(range.left.toString());
+        tokens.add(range.left);
 
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
@@ -1243,28 +1238,41 @@
                     keys.add(info.key);
             }
         }
-        Collections.sort(keys);
+        FBUtilities.sortSampledKeys(keys, range);
+        int splits = keys.size() * SSTableReader.indexInterval() / keysPerSplit;
 
-        if (keys.size() < splits)
+        if (keys.size() >= splits)
         {
-            // not enough keys to generate good splits -- generate random ones instead
-            // (since this only happens when we don't have many keys, it doesn't really matter
that the splits are poor)
             for (int i = 1; i < splits; i++)
             {
-                tokens.add(partitioner_.getRandomToken().toString());
+                int index = i * (keys.size() / splits);
+                tokens.add(keys.get(index).token);
             }
         }
-        else
+
+        tokens.add(range.right);
+        return tokens;
+    }
+
+    /** return a token to which if a node bootstraps it will get about 1/2 of this node's
range */
+    public Token getBootstrapToken()
+    {
+        Range range = getLocalPrimaryRange();
+        List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            for (int i = 1; i < splits; i++)
+            for (SSTable.KeyPosition info: cfs.allIndexPositions())
             {
-                int index = i * (keys.size() / splits);
-                tokens.add(keys.get(index).token.toString());
+                if (range.contains(info.key.token))
+                    keys.add(info.key);
             }
         }
+        FBUtilities.sortSampledKeys(keys, range);
 
-        tokens.add(range.right.toString());
-        return tokens;
+        if (keys.size() < 3)
+            return partitioner_.getRandomToken();
+        else
+            return keys.get(keys.size() / 2).token;
     }
 
     /**

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri
Feb 12 22:02:44 2010
@@ -633,6 +633,18 @@
         return ranges;
     }
 
+    public List<String> describe_splits(String start_token, String end_token, int keys_per_split)
throws TException
+    {
+        Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
+        List<Token> tokens = StorageService.instance.getSplits(new Range(tf.fromString(start_token),
tf.fromString(end_token)), keys_per_split);
+        List<String> splits = new ArrayList<String>(tokens.size());
+        for (Token token : tokens)
+        {
+            splits.add(token.toString());
+        }
+        return splits;
+    }
+
     public void login(String keyspace, AuthenticationRequest auth_request) throws AuthenticationException,
AuthorizationException, TException
     {
         DatabaseDescriptor.getAuthenticator().login(keyspace, auth_request);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Fri Feb
12 22:02:44 2010
@@ -35,6 +35,9 @@
 import org.apache.commons.collections.iterators.CollatingIterator;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
@@ -384,4 +387,32 @@
             throw new IOException(ex);
         }
     }
+
+    public static void sortSampledKeys(List<DecoratedKey> keys, Range range)
+    {
+        if (range.left.compareTo(range.right) >= 0)
+        {
+            // range wraps.  have to be careful that we sort in the same order as the range
to find the right midpoint.
+            final Token right = range.right;
+            Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>()
+            {
+                public int compare(DecoratedKey o1, DecoratedKey o2)
+                {
+                    if ((right.compareTo(o1.token) < 0 && right.compareTo(o2.token)
< 0)
+                        || (right.compareTo(o1.token) > 0 && right.compareTo(o2.token)
> 0))
+                    {
+                        // both tokens are on the same side of the wrap point
+                        return o1.compareTo(o2);
+                    }
+                    return -o1.compareTo(o2);
+                }
+            };
+            Collections.sort(keys, comparator);
+        }
+        else
+        {
+            // unwrapped range (left < right).  standard sort is all we need.
+            Collections.sort(keys);
+        }
+    }
 }



Mime
View raw message