incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [06/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:57:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/generated/TableStats.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/generated/TableStats.java b/blur-thrift/src/main/java/org/apache/blur/thrift/generated/TableStats.java
new file mode 100644
index 0000000..d1bc590
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/generated/TableStats.java
@@ -0,0 +1,848 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.blur.thrift.generated;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.IScheme;
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.SchemeFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.StandardScheme;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.TupleScheme;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TTupleProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolException;
+import org.apache.blur.thirdparty.thrift_0_9_0.EncodingUtils;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class TableStats implements org.apache.blur.thirdparty.thrift_0_9_0.TBase<TableStats, TableStats._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TStruct STRUCT_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TStruct("TableStats");
+
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField TABLE_NAME_FIELD_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField("tableName", org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STRING, (short)1);
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField BYTES_FIELD_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField("bytes", org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64, (short)2);
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField RECORD_COUNT_FIELD_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField("recordCount", org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64, (short)3);
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField ROW_COUNT_FIELD_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField("rowCount", org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64, (short)4);
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField QUERIES_FIELD_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField("queries", org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64, (short)5);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new TableStatsStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new TableStatsTupleSchemeFactory());
+  }
+
+  /**
+   * 
+   */
+  public String tableName; // required
+  /**
+   * 
+   */
+  public long bytes; // required
+  /**
+   * 
+   */
+  public long recordCount; // required
+  /**
+   * 
+   */
+  public long rowCount; // required
+  /**
+   * 
+   */
+  public long queries; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.blur.thirdparty.thrift_0_9_0.TFieldIdEnum {
+    /**
+     * 
+     */
+    TABLE_NAME((short)1, "tableName"),
+    /**
+     * 
+     */
+    BYTES((short)2, "bytes"),
+    /**
+     * 
+     */
+    RECORD_COUNT((short)3, "recordCount"),
+    /**
+     * 
+     */
+    ROW_COUNT((short)4, "rowCount"),
+    /**
+     * 
+     */
+    QUERIES((short)5, "queries");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // TABLE_NAME
+          return TABLE_NAME;
+        case 2: // BYTES
+          return BYTES;
+        case 3: // RECORD_COUNT
+          return RECORD_COUNT;
+        case 4: // ROW_COUNT
+          return ROW_COUNT;
+        case 5: // QUERIES
+          return QUERIES;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __BYTES_ISSET_ID = 0;
+  private static final int __RECORDCOUNT_ISSET_ID = 1;
+  private static final int __ROWCOUNT_ISSET_ID = 2;
+  private static final int __QUERIES_ISSET_ID = 3;
+  private byte __isset_bitfield = 0;
+  public static final Map<_Fields, org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TABLE_NAME, new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData("tableName", org.apache.blur.thirdparty.thrift_0_9_0.TFieldRequirementType.DEFAULT, 
+        new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldValueMetaData(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STRING)));
+    tmpMap.put(_Fields.BYTES, new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData("bytes", org.apache.blur.thirdparty.thrift_0_9_0.TFieldRequirementType.DEFAULT, 
+        new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldValueMetaData(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64)));
+    tmpMap.put(_Fields.RECORD_COUNT, new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData("recordCount", org.apache.blur.thirdparty.thrift_0_9_0.TFieldRequirementType.DEFAULT, 
+        new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldValueMetaData(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64)));
+    tmpMap.put(_Fields.ROW_COUNT, new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData("rowCount", org.apache.blur.thirdparty.thrift_0_9_0.TFieldRequirementType.DEFAULT, 
+        new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldValueMetaData(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64)));
+    tmpMap.put(_Fields.QUERIES, new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData("queries", org.apache.blur.thirdparty.thrift_0_9_0.TFieldRequirementType.DEFAULT, 
+        new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldValueMetaData(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData.addStructMetaDataMap(TableStats.class, metaDataMap);
+  }
+
+  public TableStats() {
+  }
+
+  public TableStats(
+    String tableName,
+    long bytes,
+    long recordCount,
+    long rowCount,
+    long queries)
+  {
+    this();
+    this.tableName = tableName;
+    this.bytes = bytes;
+    setBytesIsSet(true);
+    this.recordCount = recordCount;
+    setRecordCountIsSet(true);
+    this.rowCount = rowCount;
+    setRowCountIsSet(true);
+    this.queries = queries;
+    setQueriesIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public TableStats(TableStats other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.isSetTableName()) {
+      this.tableName = other.tableName;
+    }
+    this.bytes = other.bytes;
+    this.recordCount = other.recordCount;
+    this.rowCount = other.rowCount;
+    this.queries = other.queries;
+  }
+
+  public TableStats deepCopy() {
+    return new TableStats(this);
+  }
+
+  @Override
+  public void clear() {
+    this.tableName = null;
+    setBytesIsSet(false);
+    this.bytes = 0;
+    setRecordCountIsSet(false);
+    this.recordCount = 0;
+    setRowCountIsSet(false);
+    this.rowCount = 0;
+    setQueriesIsSet(false);
+    this.queries = 0;
+  }
+
+  /**
+   * 
+   */
+  public String getTableName() {
+    return this.tableName;
+  }
+
+  /**
+   * 
+   */
+  public TableStats setTableName(String tableName) {
+    this.tableName = tableName;
+    return this;
+  }
+
+  public void unsetTableName() {
+    this.tableName = null;
+  }
+
+  /** Returns true if field tableName is set (has been assigned a value) and false otherwise */
+  public boolean isSetTableName() {
+    return this.tableName != null;
+  }
+
+  public void setTableNameIsSet(boolean value) {
+    if (!value) {
+      this.tableName = null;
+    }
+  }
+
+  /**
+   * 
+   */
+  public long getBytes() {
+    return this.bytes;
+  }
+
+  /**
+   * 
+   */
+  public TableStats setBytes(long bytes) {
+    this.bytes = bytes;
+    setBytesIsSet(true);
+    return this;
+  }
+
+  public void unsetBytes() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __BYTES_ISSET_ID);
+  }
+
+  /** Returns true if field bytes is set (has been assigned a value) and false otherwise */
+  public boolean isSetBytes() {
+    return EncodingUtils.testBit(__isset_bitfield, __BYTES_ISSET_ID);
+  }
+
+  public void setBytesIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __BYTES_ISSET_ID, value);
+  }
+
+  /**
+   * 
+   */
+  public long getRecordCount() {
+    return this.recordCount;
+  }
+
+  /**
+   * 
+   */
+  public TableStats setRecordCount(long recordCount) {
+    this.recordCount = recordCount;
+    setRecordCountIsSet(true);
+    return this;
+  }
+
+  public void unsetRecordCount() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __RECORDCOUNT_ISSET_ID);
+  }
+
+  /** Returns true if field recordCount is set (has been assigned a value) and false otherwise */
+  public boolean isSetRecordCount() {
+    return EncodingUtils.testBit(__isset_bitfield, __RECORDCOUNT_ISSET_ID);
+  }
+
+  public void setRecordCountIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __RECORDCOUNT_ISSET_ID, value);
+  }
+
+  /**
+   * 
+   */
+  public long getRowCount() {
+    return this.rowCount;
+  }
+
+  /**
+   * 
+   */
+  public TableStats setRowCount(long rowCount) {
+    this.rowCount = rowCount;
+    setRowCountIsSet(true);
+    return this;
+  }
+
+  public void unsetRowCount() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ROWCOUNT_ISSET_ID);
+  }
+
+  /** Returns true if field rowCount is set (has been assigned a value) and false otherwise */
+  public boolean isSetRowCount() {
+    return EncodingUtils.testBit(__isset_bitfield, __ROWCOUNT_ISSET_ID);
+  }
+
+  public void setRowCountIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ROWCOUNT_ISSET_ID, value);
+  }
+
+  /**
+   * 
+   */
+  public long getQueries() {
+    return this.queries;
+  }
+
+  /**
+   * 
+   */
+  public TableStats setQueries(long queries) {
+    this.queries = queries;
+    setQueriesIsSet(true);
+    return this;
+  }
+
+  public void unsetQueries() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __QUERIES_ISSET_ID);
+  }
+
+  /** Returns true if field queries is set (has been assigned a value) and false otherwise */
+  public boolean isSetQueries() {
+    return EncodingUtils.testBit(__isset_bitfield, __QUERIES_ISSET_ID);
+  }
+
+  public void setQueriesIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __QUERIES_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case TABLE_NAME:
+      if (value == null) {
+        unsetTableName();
+      } else {
+        setTableName((String)value);
+      }
+      break;
+
+    case BYTES:
+      if (value == null) {
+        unsetBytes();
+      } else {
+        setBytes((Long)value);
+      }
+      break;
+
+    case RECORD_COUNT:
+      if (value == null) {
+        unsetRecordCount();
+      } else {
+        setRecordCount((Long)value);
+      }
+      break;
+
+    case ROW_COUNT:
+      if (value == null) {
+        unsetRowCount();
+      } else {
+        setRowCount((Long)value);
+      }
+      break;
+
+    case QUERIES:
+      if (value == null) {
+        unsetQueries();
+      } else {
+        setQueries((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case TABLE_NAME:
+      return getTableName();
+
+    case BYTES:
+      return Long.valueOf(getBytes());
+
+    case RECORD_COUNT:
+      return Long.valueOf(getRecordCount());
+
+    case ROW_COUNT:
+      return Long.valueOf(getRowCount());
+
+    case QUERIES:
+      return Long.valueOf(getQueries());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case TABLE_NAME:
+      return isSetTableName();
+    case BYTES:
+      return isSetBytes();
+    case RECORD_COUNT:
+      return isSetRecordCount();
+    case ROW_COUNT:
+      return isSetRowCount();
+    case QUERIES:
+      return isSetQueries();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof TableStats)
+      return this.equals((TableStats)that);
+    return false;
+  }
+
+  public boolean equals(TableStats that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_tableName = true && this.isSetTableName();
+    boolean that_present_tableName = true && that.isSetTableName();
+    if (this_present_tableName || that_present_tableName) {
+      if (!(this_present_tableName && that_present_tableName))
+        return false;
+      if (!this.tableName.equals(that.tableName))
+        return false;
+    }
+
+    boolean this_present_bytes = true;
+    boolean that_present_bytes = true;
+    if (this_present_bytes || that_present_bytes) {
+      if (!(this_present_bytes && that_present_bytes))
+        return false;
+      if (this.bytes != that.bytes)
+        return false;
+    }
+
+    boolean this_present_recordCount = true;
+    boolean that_present_recordCount = true;
+    if (this_present_recordCount || that_present_recordCount) {
+      if (!(this_present_recordCount && that_present_recordCount))
+        return false;
+      if (this.recordCount != that.recordCount)
+        return false;
+    }
+
+    boolean this_present_rowCount = true;
+    boolean that_present_rowCount = true;
+    if (this_present_rowCount || that_present_rowCount) {
+      if (!(this_present_rowCount && that_present_rowCount))
+        return false;
+      if (this.rowCount != that.rowCount)
+        return false;
+    }
+
+    boolean this_present_queries = true;
+    boolean that_present_queries = true;
+    if (this_present_queries || that_present_queries) {
+      if (!(this_present_queries && that_present_queries))
+        return false;
+      if (this.queries != that.queries)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(TableStats other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    TableStats typedOther = (TableStats)other;
+
+    lastComparison = Boolean.valueOf(isSetTableName()).compareTo(typedOther.isSetTableName());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTableName()) {
+      lastComparison = org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper.compareTo(this.tableName, typedOther.tableName);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetBytes()).compareTo(typedOther.isSetBytes());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetBytes()) {
+      lastComparison = org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper.compareTo(this.bytes, typedOther.bytes);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetRecordCount()).compareTo(typedOther.isSetRecordCount());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRecordCount()) {
+      lastComparison = org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper.compareTo(this.recordCount, typedOther.recordCount);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetRowCount()).compareTo(typedOther.isSetRowCount());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRowCount()) {
+      lastComparison = org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper.compareTo(this.rowCount, typedOther.rowCount);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetQueries()).compareTo(typedOther.isSetQueries());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetQueries()) {
+      lastComparison = org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper.compareTo(this.queries, typedOther.queries);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol iprot) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol oprot) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("TableStats(");
+    boolean first = true;
+
+    sb.append("tableName:");
+    if (this.tableName == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.tableName);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("bytes:");
+    sb.append(this.bytes);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("recordCount:");
+    sb.append(this.recordCount);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("rowCount:");
+    sb.append(this.rowCount);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("queries:");
+    sb.append(this.queries);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol(new org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport(out)));
+    } catch (org.apache.blur.thirdparty.thrift_0_9_0.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol(new org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport(in)));
+    } catch (org.apache.blur.thirdparty.thrift_0_9_0.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class TableStatsStandardSchemeFactory implements SchemeFactory {
+    public TableStatsStandardScheme getScheme() {
+      return new TableStatsStandardScheme();
+    }
+  }
+
+  private static class TableStatsStandardScheme extends StandardScheme<TableStats> {
+
+    public void read(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol iprot, TableStats struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // TABLE_NAME
+            if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STRING) {
+              struct.tableName = iprot.readString();
+              struct.setTableNameIsSet(true);
+            } else { 
+              org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // BYTES
+            if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64) {
+              struct.bytes = iprot.readI64();
+              struct.setBytesIsSet(true);
+            } else { 
+              org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // RECORD_COUNT
+            if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64) {
+              struct.recordCount = iprot.readI64();
+              struct.setRecordCountIsSet(true);
+            } else { 
+              org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 4: // ROW_COUNT
+            if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64) {
+              struct.rowCount = iprot.readI64();
+              struct.setRowCountIsSet(true);
+            } else { 
+              org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 5: // QUERIES
+            if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.I64) {
+              struct.queries = iprot.readI64();
+              struct.setQueriesIsSet(true);
+            } else { 
+              org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol oprot, TableStats struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.tableName != null) {
+        oprot.writeFieldBegin(TABLE_NAME_FIELD_DESC);
+        oprot.writeString(struct.tableName);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(BYTES_FIELD_DESC);
+      oprot.writeI64(struct.bytes);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(RECORD_COUNT_FIELD_DESC);
+      oprot.writeI64(struct.recordCount);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(ROW_COUNT_FIELD_DESC);
+      oprot.writeI64(struct.rowCount);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(QUERIES_FIELD_DESC);
+      oprot.writeI64(struct.queries);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class TableStatsTupleSchemeFactory implements SchemeFactory {
+    public TableStatsTupleScheme getScheme() {
+      return new TableStatsTupleScheme();
+    }
+  }
+
+  private static class TableStatsTupleScheme extends TupleScheme<TableStats> {
+
+    @Override
+    public void write(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol prot, TableStats struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetTableName()) {
+        optionals.set(0);
+      }
+      if (struct.isSetBytes()) {
+        optionals.set(1);
+      }
+      if (struct.isSetRecordCount()) {
+        optionals.set(2);
+      }
+      if (struct.isSetRowCount()) {
+        optionals.set(3);
+      }
+      if (struct.isSetQueries()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
+      if (struct.isSetTableName()) {
+        oprot.writeString(struct.tableName);
+      }
+      if (struct.isSetBytes()) {
+        oprot.writeI64(struct.bytes);
+      }
+      if (struct.isSetRecordCount()) {
+        oprot.writeI64(struct.recordCount);
+      }
+      if (struct.isSetRowCount()) {
+        oprot.writeI64(struct.rowCount);
+      }
+      if (struct.isSetQueries()) {
+        oprot.writeI64(struct.queries);
+      }
+    }
+
+    @Override
+    public void read(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol prot, TableStats struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(5);
+      if (incoming.get(0)) {
+        struct.tableName = iprot.readString();
+        struct.setTableNameIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.bytes = iprot.readI64();
+        struct.setBytesIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.recordCount = iprot.readI64();
+        struct.setRecordCountIsSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.rowCount = iprot.readI64();
+        struct.setRowCountIsSet(true);
+      }
+      if (incoming.get(4)) {
+        struct.queries = iprot.readI64();
+        struct.setQueriesIsSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
new file mode 100644
index 0000000..b47e090
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thrift.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer;
+import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer.AbstractServerArgs;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TMemoryInputTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides common methods and classes used by nonblocking TServer
+ * implementations.
+ */
+public abstract class AbstractNonblockingServer extends TServer {
+  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
+  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
+    public long maxReadBufferBytes = Long.MAX_VALUE;
+
+    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
+      super(transport);
+      transportFactory(new TFramedTransport.Factory());
+    }
+  }
+
+  /**
+   * The maximum amount of memory we will allocate to client IO buffers at a
+   * time. Without this limit, the server will gladly allocate client buffers
+   * right into an out of memory exception, rather than waiting.
+   */
+  private final long MAX_READ_BUFFER_BYTES;
+
+  /**
+   * How many bytes are currently allocated to read buffers.
+   */
+  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+
+  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
+    super(args);
+    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
+  }
+
+  /**
+   * Begin accepting connections and processing invocations.
+   */
+  public void serve() {
+    // start any IO threads
+    if (!startThreads()) {
+      return;
+    }
+
+    // start listening, or exit
+    if (!startListening()) {
+      return;
+    }
+
+    setServing(true);
+
+    // this will block while we serve
+    waitForShutdown();
+
+    setServing(false);
+
+    // do a little cleanup
+    stopListening();
+  }
+
+  /**
+   * Starts any threads required for serving.
+   * 
+   * @return true if everything went ok, false if threads could not be started.
+   */
+  protected abstract boolean startThreads();
+
+  /**
+   * A method that will block until when threads handling the serving have been
+   * shut down.
+   */
+  protected abstract void waitForShutdown();
+
+  /**
+   * Have the server transport start accepting connections.
+   * 
+   * @return true if we started listening successfully, false if something went
+   *         wrong.
+   */
+  protected boolean startListening() {
+    try {
+      serverTransport_.listen();
+      return true;
+    } catch (TTransportException ttx) {
+      LOGGER.error("Failed to start listening on server socket!", ttx);
+      return false;
+    }
+  }
+
+  /**
+   * Stop listening for connections.
+   */
+  protected void stopListening() {
+    serverTransport_.close();
+  }
+
+  /**
+   * Perform an invocation. This method could behave several different ways -
+   * invoke immediately inline, queue for separate execution, etc.
+   * 
+   * @return true if invocation was successfully requested, which is not a
+   *         guarantee that invocation has completed. False if the request
+   *         failed.
+   */
+  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
+
+  /**
+   * An abstract thread that handles selecting on a set of transports and
+   * {@link FrameBuffer FrameBuffers} associated with selected keys
+   * corresponding to requests.
+   */
+  protected abstract class AbstractSelectThread extends Thread {
+    protected final Selector selector;
+
+    // List of FrameBuffers that want to change their selection interests.
+    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
+
+    public AbstractSelectThread() throws IOException {
+      this.selector = SelectorProvider.provider().openSelector();
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      selector.wakeup();
+    }
+
+    /**
+     * Add FrameBuffer to the list of select interest changes and wake up the
+     * selector if it's blocked. When the select() call exits, it'll give the
+     * FrameBuffer a chance to change its interests.
+     */
+    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
+      synchronized (selectInterestChanges) {
+        selectInterestChanges.add(frameBuffer);
+      }
+      // wakeup the selector, if it's currently blocked.
+      selector.wakeup();
+    }
+
+    /**
+     * Check to see if there are any FrameBuffers that have switched their
+     * interest type from read to write or vice versa.
+     */
+    protected void processInterestChanges() {
+      synchronized (selectInterestChanges) {
+        for (FrameBuffer fb : selectInterestChanges) {
+          fb.changeSelectInterests();
+        }
+        selectInterestChanges.clear();
+      }
+    }
+
+    /**
+     * Do the work required to read from a readable client. If the frame is
+     * fully read, then invoke the method call.
+     */
+    protected void handleRead(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (!buffer.read()) {
+        cleanupSelectionKey(key);
+        return;
+      }
+
+      // if the buffer's frame read is complete, invoke the method.
+      if (buffer.isFrameFullyRead()) {
+        if (!requestInvoke(buffer)) {
+          cleanupSelectionKey(key);
+        }
+      }
+    }
+
+    /**
+     * Let a writable client get written, if there's data to be written.
+     */
+    protected void handleWrite(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (!buffer.write()) {
+        cleanupSelectionKey(key);
+      }
+    }
+
+    /**
+     * Do connection-close cleanup on a given SelectionKey.
+     */
+    protected void cleanupSelectionKey(SelectionKey key) {
+      // remove the records from the two maps
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (buffer != null) {
+        // close the buffer
+        buffer.close();
+      }
+      // cancel the selection key
+      key.cancel();
+    }
+  } // SelectThread
+
+  /**
+   * Possible states for the FrameBuffer state machine.
+   */
+  private enum FrameBufferState {
+    // in the midst of reading the frame size off the wire
+    READING_FRAME_SIZE,
+    // reading the actual frame data now, but not all the way done yet
+    READING_FRAME,
+    // completely read the frame, so an invocation can now happen
+    READ_FRAME_COMPLETE,
+    // waiting to get switched to listening for write events
+    AWAITING_REGISTER_WRITE,
+    // started writing response data, not fully complete yet
+    WRITING,
+    // another thread wants this framebuffer to go back to reading
+    AWAITING_REGISTER_READ,
+    // we want our transport and selection key invalidated in the selector
+    // thread
+    AWAITING_CLOSE
+  }
+
+  /**
+   * Class that implements a sort of state machine around the interaction with a
+   * client and an invoker. It manages reading the frame size and frame data,
+   * getting it handed off as wrapped transports, and then the writing of
+   * response data back to the client. In the process it manages flipping the
+   * read and write bits on the selection key for its client.
+   */
+  protected class FrameBuffer {
+    // the actual transport hooked up to the client.
+    public final TNonblockingTransport trans_;
+
+    // the SelectionKey that corresponds to our transport
+    private final SelectionKey selectionKey_;
+
+    // the SelectThread that owns the registration of our transport
+    private final AbstractSelectThread selectThread_;
+
+    // where in the process of reading/writing are we?
+    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
+
+    // the ByteBuffer we'll be using to write and read, depending on the state
+    private ByteBuffer buffer_;
+
+    private final TByteArrayOutputStream response_;
+    
+    // the frame that the TTransport should wrap.
+    private final TMemoryInputTransport frameTrans_;
+    
+    // the transport that should be used to connect to clients
+    private final TTransport inTrans_;
+    
+    private final TTransport outTrans_;
+    
+    // the input protocol to use on frames
+    private final TProtocol inProt_;
+    
+    // the output protocol to use on frames
+    private final TProtocol outProt_;
+    
+    // context associated with this connection
+    private final ServerContext context_;
+
+    public FrameBuffer(final TNonblockingTransport trans,
+        final SelectionKey selectionKey,
+        final AbstractSelectThread selectThread) {
+      trans_ = trans;
+      selectionKey_ = selectionKey;
+      selectThread_ = selectThread;
+      buffer_ = ByteBuffer.allocate(4);
+
+      frameTrans_ = new TMemoryInputTransport();
+      response_ = new TByteArrayOutputStream();
+      inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
+      outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+      inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
+      outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
+
+      if (eventHandler_ != null) {
+        context_ = eventHandler_.createContext(inProt_, outProt_);
+      } else {
+        context_  = null;
+      }
+    }
+
+    /**
+     * Give this FrameBuffer a chance to read. The selector loop should have
+     * received a read event for this FrameBuffer.
+     * 
+     * @return true if the connection should live on, false if it should be
+     *         closed
+     */
+    public boolean read() {
+      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
+        // try to read the frame size completely
+        if (!internalRead()) {
+          return false;
+        }
+
+        // if the frame size has been read completely, then prepare to read the
+        // actual frame.
+        if (buffer_.remaining() == 0) {
+          // pull out the frame size as an integer.
+          int frameSize = buffer_.getInt(0);
+          if (frameSize <= 0) {
+            LOGGER.error("Read an invalid frame size of " + frameSize
+                + ". Are you using TFramedTransport on the client side?");
+            return false;
+          }
+
+          // if this frame will always be too large for this server, log the
+          // error and close the connection.
+          if (frameSize > MAX_READ_BUFFER_BYTES) {
+            LOGGER.error("Read a frame size of " + frameSize
+                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+            return false;
+          }
+
+          // if this frame will push us over the memory limit, then return.
+          // with luck, more memory will free up the next time around.
+          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
+            return true;
+          }
+
+          // increment the amount of memory allocated to read buffers
+          readBufferBytesAllocated.addAndGet(frameSize + 4);
+
+          // reallocate the readbuffer as a frame-sized buffer
+          buffer_ = ByteBuffer.allocate(frameSize + 4);
+          buffer_.putInt(frameSize);
+
+          state_ = FrameBufferState.READING_FRAME;
+        } else {
+          // this skips the check of READING_FRAME state below, since we can't
+          // possibly go on to that state if there's data left to be read at
+          // this one.
+          return true;
+        }
+      }
+
+      // it is possible to fall through from the READING_FRAME_SIZE section
+      // to READING_FRAME if there's already some frame data available once
+      // READING_FRAME_SIZE is complete.
+
+      if (state_ == FrameBufferState.READING_FRAME) {
+        if (!internalRead()) {
+          return false;
+        }
+
+        // since we're already in the select loop here for sure, we can just
+        // modify our selection key directly.
+        if (buffer_.remaining() == 0) {
+          // get rid of the read select interests
+          selectionKey_.interestOps(0);
+          state_ = FrameBufferState.READ_FRAME_COMPLETE;
+        }
+
+        return true;
+      }
+
+      // if we fall through to this point, then the state must be invalid.
+      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to write its output to the final client.
+     */
+    public boolean write() {
+      if (state_ == FrameBufferState.WRITING) {
+        try {
+          if (trans_.write(buffer_) < 0) {
+            return false;
+          }
+        } catch (IOException e) {
+          LOGGER.warn("Got an IOException during write!", e);
+          return false;
+        }
+
+        // we're done writing. now we need to switch back to reading.
+        if (buffer_.remaining() == 0) {
+          prepareRead();
+        }
+        return true;
+      }
+
+      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to set its interest to write, once data
+     * has come in.
+     */
+    public void changeSelectInterests() {
+      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
+        // set the OP_WRITE interest
+        selectionKey_.interestOps(SelectionKey.OP_WRITE);
+        state_ = FrameBufferState.WRITING;
+      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
+        prepareRead();
+      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
+        close();
+        selectionKey_.cancel();
+      } else {
+        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
+      }
+    }
+
+    /**
+     * Shut the connection down.
+     */
+    public void close() {
+      // if we're being closed due to an error, we might have allocated a
+      // buffer that we need to subtract for our memory accounting.
+      if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) {
+        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+      }
+      trans_.close();
+      if (eventHandler_ != null) {
+        eventHandler_.deleteContext(context_, inProt_, outProt_);
+      }
+    }
+
+    /**
+     * Check if this FrameBuffer has a full frame read.
+     */
+    public boolean isFrameFullyRead() {
+      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
+    }
+
+    /**
+     * After the processor has processed the invocation, whatever thread is
+     * managing invocations should call this method on this FrameBuffer so we
+     * know it's time to start trying to write again. Also, if it turns out that
+     * there actually isn't any data in the response buffer, we'll skip trying
+     * to write and instead go back to reading.
+     */
+    public void responseReady() {
+      // the read buffer is definitely no longer in use, so we will decrement
+      // our read buffer count. we do this here as well as in close because
+      // we'd like to free this read memory up as quickly as possible for other
+      // clients.
+      readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+
+      if (response_.len() == 0) {
+        // go straight to reading again. this was probably an oneway method
+        state_ = FrameBufferState.AWAITING_REGISTER_READ;
+        buffer_ = null;
+      } else {
+        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
+
+        // set state that we're waiting to be switched to write. we do this
+        // asynchronously through requestSelectInterestChange() because there is
+        // a possibility that we're not in the main thread, and thus currently
+        // blocked in select(). (this functionality is in place for the sake of
+        // the HsHa server.)
+        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
+      }
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Actually invoke the method signified by this FrameBuffer.
+     */
+    public void invoke() {
+      frameTrans_.reset(buffer_.array());
+      response_.reset();
+      
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.processContext(context_, inTrans_, outTrans_);
+        }
+        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
+        responseReady();
+        return;
+      } catch (TException te) {
+        LOGGER.warn("Exception while invoking!", te);
+      } catch (Throwable t) {
+        LOGGER.error("Unexpected throwable while invoking!", t);
+      }
+      // This will only be reached when there is a throwable.
+      state_ = FrameBufferState.AWAITING_CLOSE;
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Perform a read into buffer.
+     * 
+     * @return true if the read succeeded, false if there was an error or the
+     *         connection closed.
+     */
+    private boolean internalRead() {
+      try {
+        if (trans_.read(buffer_) < 0) {
+          return false;
+        }
+        return true;
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException in internalRead!", e);
+        return false;
+      }
+    }
+
+    /**
+     * We're done writing, so reset our interest ops and change state
+     * accordingly.
+     */
+    private void prepareRead() {
+      // we can set our interest directly without using the queue because
+      // we're in the select thread.
+      selectionKey_.interestOps(SelectionKey.OP_READ);
+      // get ready for another go-around
+      buffer_ = ByteBuffer.allocate(4);
+      state_ = FrameBufferState.READING_FRAME_SIZE;
+    }
+
+    /**
+     * When this FrameBuffer needs to change its select interests and execution
+     * might not be in its select thread, then this method will make sure the
+     * interest change gets done when the select thread wakes back up. When the
+     * current thread is this FrameBuffer's select thread, then it just does the
+     * interest change immediately.
+     */
+    private void requestSelectInterestChange() {
+      if (Thread.currentThread() == this.selectThread_) {
+        changeSelectInterests();
+      } else {
+        this.selectThread_.requestSelectInterestChange(this);
+      }
+    }
+  } // FrameBuffer
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
new file mode 100644
index 0000000..57cc085
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
@@ -0,0 +1,20 @@
+package org.apache.blur.thrift.server;
+
+import org.apache.blur.thrift.server.AbstractNonblockingServer.FrameBuffer;
+
+/**
+ * An Invocation represents a method call that is prepared to execute, given an
+ * idle worker thread. It contains the input and output protocols the thread's
+ * processor should use to perform the usual Thrift invocation.
+ */
+class Invocation implements Runnable {
+  private final FrameBuffer frameBuffer;
+
+  public Invocation(final FrameBuffer frameBuffer) {
+    this.frameBuffer = frameBuffer;
+  }
+
+  public void run() {
+    frameBuffer.invoke();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
new file mode 100644
index 0000000..b58b063
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thrift.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Half-Sync/Half-Async server with a separate pool of threads to handle
+ * non-blocking I/O. Accepts are handled on a single thread, and a configurable
+ * number of nonblocking selector threads manage reading and writing of client
+ * connections. A synchronous worker thread pool handles processing of requests.
+ * 
+ * Performs better than TNonblockingServer/THsHaServer in multi-core
+ * environments when the the bottleneck is CPU on the single selector thread
+ * handling I/O. In addition, because the accept handling is decoupled from
+ * reads/writes and invocation, the server has better ability to handle back-
+ * pressure from new connections (e.g. stop accepting when busy).
+ * 
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class TThreadedSelectorServer extends AbstractNonblockingServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
+
+  private static int selectorThreadCount;
+
+  public static class Args extends AbstractNonblockingServerArgs<Args> {
+
+    /** The number of threads for selecting on already-accepted connections */
+    public int selectorThreads = 2;
+    /**
+     * The size of the executor service (if none is specified) that will handle
+     * invocations. This may be set to 0, in which case invocations will be
+     * handled directly on the selector threads (as is in TNonblockingServer)
+     */
+    private int workerThreads = 5;
+    /** Time to wait for server to stop gracefully */
+    private int stopTimeoutVal = 60;
+    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    /** The ExecutorService for handling dispatched requests */
+    private ExecutorService executorService = null;
+    /**
+     * The size of the blocking queue per selector thread for passing accepted
+     * connections to the selector thread
+     */
+    private int acceptQueueSizePerThread = 4;
+
+    /**
+     * Determines the strategy for handling new accepted connections.
+     */
+    public static enum AcceptPolicy {
+      /**
+       * Require accepted connection registration to be handled by the executor.
+       * If the worker pool is saturated, further accepts will be closed
+       * immediately. Slightly increases latency due to an extra scheduling.
+       */
+      FAIR_ACCEPT,
+      /**
+       * Handle the accepts as fast as possible, disregarding the status of the
+       * executor service.
+       */
+      FAST_ACCEPT
+    }
+
+    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
+
+    public Args(TNonblockingServerTransport transport) {
+      super(transport);
+    }
+
+    public Args selectorThreads(int i) {
+      selectorThreads = i;
+      return this;
+    }
+
+    public int getSelectorThreads() {
+      return selectorThreads;
+    }
+
+    public Args workerThreads(int i) {
+      workerThreads = i;
+      return this;
+    }
+
+    public int getWorkerThreads() {
+      return workerThreads;
+    }
+
+    public int getStopTimeoutVal() {
+      return stopTimeoutVal;
+    }
+
+    public Args stopTimeoutVal(int stopTimeoutVal) {
+      this.stopTimeoutVal = stopTimeoutVal;
+      return this;
+    }
+
+    public TimeUnit getStopTimeoutUnit() {
+      return stopTimeoutUnit;
+    }
+
+    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+      this.stopTimeoutUnit = stopTimeoutUnit;
+      return this;
+    }
+
+    public ExecutorService getExecutorService() {
+      return executorService;
+    }
+
+    public Args executorService(ExecutorService executorService) {
+      this.executorService = executorService;
+      return this;
+    }
+
+    public int getAcceptQueueSizePerThread() {
+      return acceptQueueSizePerThread;
+    }
+
+    public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
+      this.acceptQueueSizePerThread = acceptQueueSizePerThread;
+      return this;
+    }
+
+    public AcceptPolicy getAcceptPolicy() {
+      return acceptPolicy;
+    }
+
+    public Args acceptPolicy(AcceptPolicy acceptPolicy) {
+      this.acceptPolicy = acceptPolicy;
+      return this;
+    }
+
+    public void validate() {
+      if (selectorThreads <= 0) {
+        throw new IllegalArgumentException("selectorThreads must be positive.");
+      }
+      if (workerThreads < 0) {
+        throw new IllegalArgumentException("workerThreads must be non-negative.");
+      }
+      if (acceptQueueSizePerThread <= 0) {
+        throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
+      }
+    }
+  }
+
+  // Flag for stopping the server
+  // Please see THRIFT-1795 for the usage of this flag
+  private volatile boolean stopped_ = false;
+
+  // The thread handling all accepts
+  private AcceptThread acceptThread;
+
+  // Threads handling events on client transports
+  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
+
+  // This wraps all the functionality of queueing and thread pool management
+  // for the passing of Invocations from the selector thread(s) to the workers
+  // (if any).
+  private final ExecutorService invoker;
+
+  private final Args args;
+
+  /**
+   * Create the server with the specified Args configuration
+   */
+  public TThreadedSelectorServer(Args args) {
+    super(args);
+    args.validate();
+    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
+    this.args = args;
+  }
+
+  /**
+   * Start the accept and selector threads running to deal with clients.
+   * 
+   * @return true if everything went ok, false if we couldn't start for some
+   *         reason.
+   */
+  @Override
+  protected boolean startThreads() {
+    try {
+      for (int i = 0; i < args.selectorThreads; ++i) {
+        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
+      }
+      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+          createSelectorThreadLoadBalancer(selectorThreads));
+      for (SelectorThread thread : selectorThreads) {
+        thread.start();
+      }
+      acceptThread.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start threads!", e);
+      return false;
+    }
+  }
+
+  /**
+   * Joins the accept and selector threads and shuts down the executor service.
+   */
+  @Override
+  protected void waitForShutdown() {
+    try {
+      joinThreads();
+    } catch (InterruptedException e) {
+      // Non-graceful shutdown occurred
+      LOGGER.error("Interrupted while joining threads!", e);
+    }
+    gracefullyShutdownInvokerPool();
+  }
+
+  protected void joinThreads() throws InterruptedException {
+    // wait until the io threads exit
+    acceptThread.join();
+    for (SelectorThread thread : selectorThreads) {
+      thread.join();
+    }
+  }
+
+  /**
+   * Stop serving and shut everything down.
+   */
+  @Override
+  public void stop() {
+    stopped_ = true;
+
+    // Stop queuing connect attempts asap
+    stopListening();
+
+    if (acceptThread != null) {
+      acceptThread.wakeupSelector();
+    }
+    if (selectorThreads != null) {
+      for (SelectorThread thread : selectorThreads) {
+        if (thread != null)
+          thread.wakeupSelector();
+      }
+    }
+  }
+
+  protected void gracefullyShutdownInvokerPool() {
+    // try to gracefully shut down the executor service
+    invoker.shutdown();
+
+    // Loop until awaitTermination finally does return without a interrupted
+    // exception. If we don't do this, then we'll shut down prematurely. We want
+    // to let the executorService clear it's task queue, closing client sockets
+    // appropriately.
+    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
+    long now = System.currentTimeMillis();
+    while (timeoutMS >= 0) {
+      try {
+        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long newnow = System.currentTimeMillis();
+        timeoutMS -= (newnow - now);
+        now = newnow;
+      }
+    }
+  }
+
+  /**
+   * We override the standard invoke method here to queue the invocation for
+   * invoker service instead of immediately invoking. If there is no thread
+   * pool, handle the invocation inline on this thread
+   */
+  @Override
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    Runnable invocation = getRunnable(frameBuffer);
+    if (invoker != null) {
+      try {
+        invoker.execute(invocation);
+        return true;
+      } catch (RejectedExecutionException rx) {
+        LOGGER.warn("ExecutorService rejected execution!", rx);
+        return false;
+      }
+    } else {
+      // Invoke on the caller's thread
+      invocation.run();
+      return true;
+    }
+  }
+
+  protected Runnable getRunnable(FrameBuffer frameBuffer) {
+    return new Invocation(frameBuffer);
+  }
+
+  /**
+   * Helper to create the invoker if one is not specified
+   */
+  protected static ExecutorService createDefaultExecutor(Args options) {
+    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
+  }
+
+  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
+    if (queueSize == 0) {
+      // Unbounded queue
+      return new LinkedBlockingQueue<TNonblockingTransport>();
+    }
+    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
+  }
+
+  /**
+   * The thread that selects on the server transport (listen socket) and accepts
+   * new connections to hand off to the IO selector threads
+   */
+  protected class AcceptThread extends Thread {
+
+    // The listen socket to accept on
+    private final TNonblockingServerTransport serverTransport;
+    private final Selector acceptSelector;
+
+    private final SelectorThreadLoadBalancer threadChooser;
+
+    /**
+     * Set up the AcceptThead
+     * 
+     * @throws IOException
+     */
+    public AcceptThread(TNonblockingServerTransport serverTransport, SelectorThreadLoadBalancer threadChooser)
+        throws IOException {
+      this.serverTransport = serverTransport;
+      this.threadChooser = threadChooser;
+      this.acceptSelector = SelectorProvider.provider().openSelector();
+      this.serverTransport.registerSelector(acceptSelector);
+    }
+
+    /**
+     * The work loop. Selects on the server transport and accepts. If there was
+     * a server transport that had blocking accepts, and returned on blocking
+     * client transports, that should be used instead
+     */
+    public void run() {
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
+        while (!stopped_) {
+          select();
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      acceptSelector.wakeup();
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are connections to
+     * be accepted, accept them.
+     */
+    private void select() {
+      try {
+        // wait for connect events.
+        acceptSelector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            continue;
+          }
+
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    private void handleAccept() {
+      final TNonblockingTransport client = doAccept();
+      if (client != null) {
+        // Pass this connection to a selector thread
+        final SelectorThread targetThread = threadChooser.nextThread();
+
+        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
+          doAddAccept(targetThread, client);
+        } else {
+          // FAIR_ACCEPT
+          try {
+            invoker.submit(new Runnable() {
+              public void run() {
+                doAddAccept(targetThread, client);
+              }
+            });
+          } catch (RejectedExecutionException rx) {
+            LOGGER.warn("ExecutorService rejected accept registration!", rx);
+            // close immediately
+            client.close();
+          }
+        }
+      }
+    }
+
+    private TNonblockingTransport doAccept() {
+      try {
+        return (TNonblockingTransport) serverTransport.accept();
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        return null;
+      }
+    }
+
+    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
+      if (!thread.addAcceptedConnection(client)) {
+        client.close();
+      }
+    }
+  } // AcceptThread
+
+  /**
+   * The SelectorThread(s) will be doing all the selecting on accepted active
+   * connections.
+   */
+  protected class SelectorThread extends AbstractSelectThread {
+
+    // Accepted connections added by the accept thread.
+    private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+
+    /**
+     * Set up the SelectorThread with an unbounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread() throws IOException {
+      this(new LinkedBlockingQueue<TNonblockingTransport>());
+    }
+
+    /**
+     * Set up the SelectorThread with an bounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread(int maxPendingAccepts) throws IOException {
+      this(createDefaultAcceptQueue(maxPendingAccepts));
+    }
+
+    /**
+     * Set up the SelectorThread with a specified queue for connections.
+     * 
+     * @param acceptedQueue
+     *          The BlockingQueue implementation for holding incoming accepted
+     *          connections.
+     * @throws IOException
+     *           if a selector cannot be created.
+     */
+    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
+      this.acceptedQueue = acceptedQueue;
+    }
+
+    /**
+     * Hands off an accepted connection to be handled by this thread. This
+     * method will block if the queue for new connections is at capacity.
+     * 
+     * @param accepted
+     *          The connection that has been accepted.
+     * @return true if the connection has been successfully added.
+     */
+    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
+      try {
+        acceptedQueue.put(accepted);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted while adding accepted connection!", e);
+        return false;
+      }
+      selector.wakeup();
+      return true;
+    }
+
+    /**
+     * The work loop. Handles selecting (read/write IO), dispatching, and
+     * managing the selection preferences of all existing connections.
+     */
+    public void run() {
+      try {
+        Thread thread = Thread.currentThread();
+        if (thread.getName().startsWith("Thread-")) {
+          thread.setName("TThreadedSelectorServer-SelectorThread-" + selectorThreadCount++);
+        }
+        while (!stopped_) {
+          select();
+          processAcceptedConnections();
+          processInterestChanges();
+        }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the accept thread and the other selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are existing
+     * connections with data waiting to be read, read it, buffering until a
+     * whole frame has been read. If there are any pending responses, buffer
+     * them until their target client is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    private void processAcceptedConnections() {
+      // Register accepted connections
+      while (!stopped_) {
+        TNonblockingTransport accepted = acceptedQueue.poll();
+        if (accepted == null) {
+          break;
+        }
+        registerAccepted(accepted);
+      }
+    }
+
+    private void registerAccepted(TNonblockingTransport accepted) {
+      SelectionKey clientKey = null;
+      try {
+        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
+
+        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+        clientKey.attach(frameBuffer);
+      } catch (IOException e) {
+        LOGGER.warn("Failed to register accepted connection to selector!", e);
+        if (clientKey != null) {
+          cleanupSelectionKey(clientKey);
+        }
+        accepted.close();
+      }
+    }
+  } // SelectorThread
+
+  /**
+   * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
+   * assigning newly accepted connections across the threads.
+   */
+  protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
+    return new SelectorThreadLoadBalancer(threads);
+  }
+
+  /**
+   * A round robin load balancer for choosing selector threads for new
+   * connections.
+   */
+  protected class SelectorThreadLoadBalancer {
+    private final Collection<? extends SelectorThread> threads;
+    private Iterator<? extends SelectorThread> nextThreadIterator;
+
+    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
+      if (threads.isEmpty()) {
+        throw new IllegalArgumentException("At least one selector thread is required");
+      }
+      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
+      nextThreadIterator = this.threads.iterator();
+    }
+
+    public SelectorThread nextThread() {
+      // Choose a selector thread (round robin)
+      if (!nextThreadIterator.hasNext()) {
+        nextThreadIterator = threads.iterator();
+      }
+      return nextThreadIterator.next();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/util/BlurThriftHelper.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/BlurThriftHelper.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/BlurThriftHelper.java
new file mode 100644
index 0000000..8beedc1
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/BlurThriftHelper.java
@@ -0,0 +1,105 @@
+package org.apache.blur.thrift.util;
+
+import java.util.Arrays;
+
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.SimpleQuery;
+
+public class BlurThriftHelper {
+
+  public static Selector newSelector(String locationId) {
+    Selector selector = new Selector();
+    selector.locationId = locationId;
+    return selector;
+  }
+
+  public static BlurQuery newSimpleQuery(String query) {
+    BlurQuery blurQuery = new BlurQuery();
+    SimpleQuery simpleQuery = new SimpleQuery();
+    simpleQuery.setQueryStr(query);
+    blurQuery.setSimpleQuery(simpleQuery);
+    blurQuery.setSelector(new Selector());
+    return blurQuery;
+  }
+
+  public static RowMutation newRowMutation(String table, String rowId, RecordMutation... mutations) {
+    return newRowMutation(RowMutationType.REPLACE_ROW, table, rowId, mutations);
+  }
+
+  public static RowMutation newRowMutation(RowMutationType type, String table, String rowId,
+      RecordMutation... mutations) {
+    RowMutation mutation = new RowMutation();
+    mutation.setRowId(rowId);
+    mutation.setTable(table);
+    mutation.setRowMutationType(type);
+    for (RecordMutation recordMutation : mutations) {
+      mutation.addToRecordMutations(recordMutation);
+    }
+    return mutation;
+  }
+
+  public static Record newRecord(String family, String recordId, Column... columns) {
+    Record record = new Record();
+    record.setRecordId(recordId);
+    record.setFamily(family);
+    record.setColumns(Arrays.asList(columns));
+    return record;
+  }
+
+  public static Row newRow(String rowId, Record... records) {
+    Row row = new Row().setId(rowId);
+    for (Record record : records) {
+      row.addToRecords(record);
+    }
+    return row;
+  }
+
+  public static Column newColumn(String name, String value) {
+    return new Column().setName(name).setValue(value);
+  }
+
+  public static RecordMutation newRecordMutation(String family, String recordId, Column... columns) {
+    return newRecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, family, recordId, columns);
+  }
+
+  public static RecordMutation newRecordMutation(RecordMutationType type, String family, String recordId,
+      Column... columns) {
+    Record record = new Record();
+    record.setRecordId(recordId);
+    record.setFamily(family);
+    for (Column column : columns) {
+      record.addToColumns(column);
+    }
+
+    RecordMutation mutation = new RecordMutation();
+    mutation.setRecordMutationType(type);
+    mutation.setRecord(record);
+    return mutation;
+  }
+
+  public static RecordMutation findRecordMutation(RowMutation mutation, Record record) {
+    for (RecordMutation recordMutation : mutation.recordMutations) {
+      if (match(recordMutation, record)) {
+        return recordMutation;
+      }
+    }
+    return null;
+  }
+
+  public static boolean match(RecordMutation mutation, Record record) {
+    return match(mutation.record, record);
+  }
+
+  public static boolean match(Record left, Record right) {
+    return left.recordId.equals(right.recordId) && left.family.equals(right.family);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateInsertQueryRepeating.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateInsertQueryRepeating.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateInsertQueryRepeating.java
new file mode 100644
index 0000000..ae89b6f
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateInsertQueryRepeating.java
@@ -0,0 +1,241 @@
+package org.apache.blur.thrift.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.Blur.Iface;
+
+
+import static org.apache.blur.thrift.util.BlurThriftHelper.*;
+
+/**
+ * Tests a lot of things, mainly connecting to a blur cluster and slamming a
+ * bunch of rows in before querying for them. I like to use it as a load test.
+ * 
+ */
+public class CreateInsertQueryRepeating {
+
+  private DecimalFormat df = new DecimalFormat("#,###,000.00");
+  private static final char[] symbols = new char[36];
+
+  static {
+    for (int idx = 0; idx < 10; ++idx)
+      symbols[idx] = (char) ('0' + idx);
+    for (int idx = 10; idx < 36; ++idx)
+      symbols[idx] = (char) ('a' + idx - 10);
+  }
+
+  private String table = "test1";
+  private String host = "localhost";
+  private Iface client = null;
+
+  public CreateInsertQueryRepeating(String host, String table) throws BlurException, TException, IOException {
+    this.host = host;
+    this.table = table;
+
+    // init
+    String connectionStr = host + ":40010";
+    String cluster = "default";
+    client = BlurClient.getClient(connectionStr);
+
+    List<String> clusterList = client.shardClusterList();
+    if (clusterList != null && clusterList.size() > 0)
+      cluster = clusterList.get(0);
+    else
+      throw new IOException("cannot find a cluster to use :(");
+
+    System.out.println("using cluster: " + cluster);
+
+    List<String> tableList = client.tableList();
+    if (tableList == null || !tableList.contains(table))
+      createTable(client, table, cluster);
+    else
+      System.out.println("table existed, did not create.");
+  }
+
+  private final Random random = new Random();
+
+  public String randomString(int length) {
+    char[] buf = new char[length];
+
+    for (int idx = 0; idx < buf.length; ++idx)
+      buf[idx] = symbols[random.nextInt(symbols.length)];
+    return new String(buf);
+  }
+
+  public void getClusters(Iface client) {
+    try {
+      List<String> shardClusterList = client.shardClusterList();
+      for (String cluster : shardClusterList)
+        System.out.println("cluster: " + cluster);
+    } catch (BlurException e) {
+      e.printStackTrace();
+    } catch (TException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void createTable(Iface client, String tableName, String cluster) throws BlurException, TException {
+    TableDescriptor td = new TableDescriptor();
+    td.analyzerDefinition = new AnalyzerDefinition();
+
+    td.name = tableName;
+    // TODO: doc doesnt say required, yet it barfs without it?
+    td.cluster = cluster == null ? "default" : cluster;
+    // auto enable table
+    td.isEnabled = true;
+
+    // 1 shard per server :)
+    td.shardCount = client.shardServerList(cluster).size();
+    td.readOnly = false;
+    // TODO: hardcodes bad, assuming NN on same node as BC
+    td.tableUri = "hdfs://" + host + ":8020/" + tableName;
+    client.createTable(td);
+    System.out.println("table created");
+  }
+
+  /**
+   * @param args
+   * @throws TException
+   * @throws BlurException
+   * @throws IOException
+   */
+  public static void main(String[] args) throws BlurException, TException, IOException {
+    String host = "localhost";
+    String table = "test1";
+
+    if (args != null) {
+      if (args.length >= 1)
+        host = args[0];
+      if (args.length == 2)
+        table = args[1];
+    }
+
+    CreateInsertQueryRepeating test = new CreateInsertQueryRepeating(host, table);
+
+    // System.out.println("Testing joins real quick");
+    // test.testJoin();
+    // System.out.println("test done");
+
+    System.out.println("Starting load");
+    test.loadupTable(100);
+    System.out.println("Finshed load");
+
+    System.out.println("query time!");
+    test.queryTable(50000);
+    System.out.println("query done!");
+
+    System.exit(0);
+  }
+
+  @SuppressWarnings("unused")
+  private void testJoin() throws BlurException, TException {
+    RowMutation mutation = new RowMutation();
+    mutation.table = table;
+    mutation.waitToBeVisible = true;
+    mutation.rowId = "row1";
+    mutation.addToRecordMutations(newRecordMutation("cf1", "recordid1", newColumn("col1", "value1")));
+    mutation.addToRecordMutations(newRecordMutation("cf1", "recordid2", newColumn("col2", "value2")));
+    mutation.rowMutationType = RowMutationType.REPLACE_ROW;
+    client.mutate(mutation);
+
+    List<String> joinTest = new ArrayList<String>();
+    joinTest.add("+cf1.col1:value1");
+    joinTest.add("+cf1.col2:value2");
+    joinTest.add("+cf1.col1:value1 +cf1.col2:value2");
+    joinTest.add("+(+cf1.col1:value1 nocf.nofield:somevalue) +(+cf1.col2.value2 nocf.nofield:somevalue)");
+    joinTest.add("+(+cf1.col1:value1) +(cf1.bla:bla +cf1.col2.value2)");
+
+    for (String q : joinTest)
+      System.out.println(q + " hits: " + hits(client, table, q, true));
+  }
+
+  private static long hits(Iface client, String table, String queryStr, boolean superQuery) throws BlurException, TException {
+    BlurQuery bq = new BlurQuery();
+    SimpleQuery sq = new SimpleQuery();
+    sq.queryStr = queryStr;
+    sq.superQueryOn = superQuery;
+    bq.simpleQuery = sq;
+    BlurResults query = client.query(table, bq);
+    return query.totalResults;
+  }
+
+  // really only useful against the table that was filled via loadupTable
+  public void queryTable(int times) throws BlurException, TException {
+    long start = System.currentTimeMillis();
+    BlurQuery bq = new BlurQuery();
+    bq.fetch = 10;
+    for (int i = 1; i <= times; i++) {
+      SimpleQuery sq = new SimpleQuery();
+      sq.queryStr = "numberField:" + random.nextInt(1000);
+      sq.superQueryOn = true;
+      bq.simpleQuery = sq;
+      client.query(table, bq);
+      if (i % 1000 == 0) {
+        System.out.println("queries: " + i + " times " + df.format((i / ((System.currentTimeMillis() - start + 0.0) / 1000))) + " queries/s");
+      }
+    }
+    System.out.println("queries: " + times + " times " + df.format((times / ((System.currentTimeMillis() - start + 0.0) / 1000))) + " queries/s");
+
+  }
+
+  public void loadupTable(int rows) throws BlurException, TException, IOException {
+
+    long start = System.currentTimeMillis();
+
+    long buildTotal = 0;
+    RowMutation mutation = new RowMutation();
+
+    for (int i = 1; i <= rows; i++) {
+      long buildStart = System.currentTimeMillis();
+      mutation.clear();
+      mutation.table = table;
+      mutation.waitToBeVisible = false;
+      mutation.rowId = UUID.randomUUID().toString();
+      mutation.addToRecordMutations(newRecordMutation("test", "test-" + i, newColumn("uuidField", UUID.randomUUID().toString()), newColumn("numberField", i + ""),
+          newColumn("fatTextField", randomString(1000))));
+      mutation.rowMutationType = RowMutationType.REPLACE_ROW;
+
+      if (i % 50 == 0) {
+        System.out.println("loaded: " + i + " around " + df.format((i / ((System.currentTimeMillis() - start + 0.0) / 1000))) + " rows/s");
+        System.out.println("Total time: " + (System.currentTimeMillis() - start + 0.0) / 1000 + " Build time: " + ((buildTotal / 1000) + 0.0) + " " + buildTotal);
+      }
+
+      buildTotal += System.currentTimeMillis() - buildStart;
+
+      client.mutate(mutation);
+
+    }
+    System.out.println("loaded: " + rows + " around " + df.format((rows / ((System.currentTimeMillis() - start + 0.0) / 1000))) + " rows/s");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateTable.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateTable.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateTable.java
new file mode 100644
index 0000000..1e76063
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CreateTable.java
@@ -0,0 +1,50 @@
+package org.apache.blur.thrift.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.Blur.Iface;
+
+
+public class CreateTable {
+
+  public static void main(String[] args) throws BlurException, TException, IOException {
+    String connectionStr = args[0];
+    final String cluster = args[1];
+    final String tableName = args[2];
+    int shardCount = Integer.parseInt(args[3]);
+    String uri = args[4];
+
+    final TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.analyzerDefinition = new AnalyzerDefinition();
+    tableDescriptor.cluster = cluster;
+    tableDescriptor.name = tableName;
+    tableDescriptor.readOnly = false;
+
+    tableDescriptor.shardCount = shardCount;
+    tableDescriptor.tableUri = uri;
+
+    Iface client = BlurClient.getClient(connectionStr);
+    client.createTable(tableDescriptor);
+  }
+}


Mime
View raw message