accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [18/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 21:59:43 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
new file mode 100644
index 0000000..7e2bbd1
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This iterator facilitates document-partitioned indexing. It is an example of extending the IntersectingIterator to customize the placement of the term and
+ * docID. As with the IntersectingIterator, documents are grouped together and indexed into a single row of an Accumulo table. This allows a tablet server to
+ * perform boolean AND operations on terms in the index. This iterator also stores the document contents in a separate column family in the same row so that the
+ * full document can be returned with each query.
+ * 
+ * The table structure should have the following form:
+ * 
+ * row: shardID, colfam: docColf\0doctype, colqual: docID, value: doc
+ * 
+ * row: shardID, colfam: indexColf, colqual: term\0doctype\0docID\0info, value: (empty)
+ * 
+ * When you configure this iterator with a set of terms, it will return only the docIDs and docs that appear with all of the specified terms. The result will
+ * have the following form:
+ * 
+ * row: shardID, colfam: indexColf, colqual: doctype\0docID\0info, value: doc
+ * 
+ * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
+ */
+public class IndexedDocIterator extends IntersectingIterator {
+  public static final Text DEFAULT_INDEX_COLF = new Text("i");
+  public static final Text DEFAULT_DOC_COLF = new Text("e");
+  
+  private static final String indexFamilyOptionName = "indexFamily";
+  private static final String docFamilyOptionName = "docFamily";
+  
+  private static Text indexColf = DEFAULT_INDEX_COLF;
+  private static Text docColf = DEFAULT_DOC_COLF;
+  private static Set<ByteSequence> indexColfSet;
+  private static Set<ByteSequence> docColfSet;
+  
+  private static final byte[] nullByte = {0};
+  
+  public SortedKeyValueIterator<Key,Value> docSource;
+  
+  @Override
+  protected Key buildKey(Text partition, Text term, Text docID) {
+    Text colq = new Text(term);
+    colq.append(nullByte, 0, 1);
+    colq.append(docID.getBytes(), 0, docID.getLength());
+    colq.append(nullByte, 0, 1);
+    return new Key(partition, indexColf, colq);
+  }
+  
+  @Override
+  protected Key buildKey(Text partition, Text term) {
+    Text colq = new Text(term);
+    return new Key(partition, indexColf, colq);
+  }
+  
+  @Override
+  protected Text getDocID(Key key) {
+    return parseDocID(key);
+  }
+  
+  public static Text parseDocID(Key key) {
+    Text colq = key.getColumnQualifier();
+    int firstZeroIndex = colq.find("\0");
+    if (firstZeroIndex < 0) {
+      throw new IllegalArgumentException("bad docid: " + key.toString());
+    }
+    int secondZeroIndex = colq.find("\0", firstZeroIndex + 1);
+    if (secondZeroIndex < 0) {
+      throw new IllegalArgumentException("bad docid: " + key.toString());
+    }
+    int thirdZeroIndex = colq.find("\0", secondZeroIndex + 1);
+    if (thirdZeroIndex < 0) {
+      throw new IllegalArgumentException("bad docid: " + key.toString());
+    }
+    Text docID = new Text();
+    try {
+      docID.set(colq.getBytes(), firstZeroIndex + 1, thirdZeroIndex - 1 - firstZeroIndex);
+    } catch (ArrayIndexOutOfBoundsException e) {
+      throw new IllegalArgumentException("bad indices for docid: " + key.toString() + " " + firstZeroIndex + " " + secondZeroIndex + " " + thirdZeroIndex);
+    }
+    return docID;
+  }
+  
+  @Override
+  protected Text getTerm(Key key) {
+    if (indexColf.compareTo(key.getColumnFamily().getBytes(), 0, indexColf.getLength()) < 0) {
+      // We're past the index column family, so return a term that will sort lexicographically last.
+      // The last unicode character should suffice
+      return new Text("\uFFFD");
+    }
+    Text colq = key.getColumnQualifier();
+    int zeroIndex = colq.find("\0");
+    Text term = new Text();
+    term.set(colq.getBytes(), 0, zeroIndex);
+    return term;
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    if (options.containsKey(indexFamilyOptionName))
+      indexColf = new Text(options.get(indexFamilyOptionName));
+    if (options.containsKey(docFamilyOptionName))
+      docColf = new Text(options.get(docFamilyOptionName));
+    docSource = source.deepCopy(env);
+    indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength()));
+    
+    for (TermSource ts : this.sources) {
+      ts.seekColfams = indexColfSet;
+    }
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+    super.seek(range, null, true);
+    
+  }
+  
+  @Override
+  protected void advanceToIntersection() throws IOException {
+    super.advanceToIntersection();
+    if (topKey == null)
+      return;
+    if (log.isTraceEnabled())
+      log.trace("using top key to seek for doc: " + topKey.toString());
+    Key docKey = buildDocKey();
+    docSource.seek(new Range(docKey, true, null, false), docColfSet, true);
+    log.debug("got doc key: " + docSource.getTopKey().toString());
+    if (docSource.hasTop() && docKey.equals(docSource.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
+      value = docSource.getTopValue();
+    }
+    log.debug("got doc value: " + value.toString());
+  }
+  
+  protected Key buildDocKey() {
+    if (log.isTraceEnabled())
+      log.trace("building doc key for " + currentPartition + " " + currentDocID);
+    int zeroIndex = currentDocID.find("\0");
+    if (zeroIndex < 0)
+      throw new IllegalArgumentException("bad current docID");
+    Text colf = new Text(docColf);
+    colf.append(nullByte, 0, 1);
+    colf.append(currentDocID.getBytes(), 0, zeroIndex);
+    docColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(colf.getBytes(), 0, colf.getLength()));
+    if (log.isTraceEnabled())
+      log.trace(zeroIndex + " " + currentDocID.getLength());
+    Text colq = new Text();
+    colq.set(currentDocID.getBytes(), zeroIndex + 1, currentDocID.getLength() - zeroIndex - 1);
+    Key k = new Key(currentPartition, colf, colq);
+    if (log.isTraceEnabled())
+      log.trace("built doc key for seek: " + k.toString());
+    return k;
+  }
+  
+  /**
+   * A convenience method for setting the index column family.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param indexColf
+   *          the index column family
+   */
+  public static void setIndexColf(IteratorSetting is, String indexColf) {
+    is.addOption(indexFamilyOptionName, indexColf);
+  }
+  
+  /**
+   * A convenience method for setting the document column family prefix.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param docColfPrefix
+   *          the prefix of the document column family (colf will be of the form docColfPrefix\0doctype)
+   */
+  public static void setDocColfPrefix(IteratorSetting is, String docColfPrefix) {
+    is.addOption(docFamilyOptionName, docColfPrefix);
+  }
+  
+  /**
+   * A convenience method for setting the index column family and document column family prefix.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param indexColf
+   *          the index column family
+   * @param docColfPrefix
+   *          the prefix of the document column family (colf will be of the form docColfPrefix\0doctype)
+   */
+  public static void setColfs(IteratorSetting is, String indexColf, String docColfPrefix) {
+    setIndexColf(is, indexColf);
+    setDocColfPrefix(is, docColfPrefix);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
new file mode 100644
index 0000000..ed560b5
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of
+ * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index.
+ * 
+ * The table structure should have the following form:
+ * 
+ * row: shardID, colfam: term, colqual: docID
+ * 
+ * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The
+ * result will have an empty column family, as follows:
+ * 
+ * row: shardID, colfam: (empty), colqual: docID
+ * 
+ * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
+ * 
+ * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections
+ * over terms. Extending classes should override the {@link TermSource#seekColfams} in their implementation's 
+ * {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
+ * 
+ * README.shard in docs/examples shows an example of using the IntersectingIterator.
+ */
+public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
+  
+  protected Text nullText = new Text();
+  
+  protected Text getPartition(Key key) {
+    return key.getRow();
+  }
+  
+  protected Text getTerm(Key key) {
+    return key.getColumnFamily();
+  }
+  
+  protected Text getDocID(Key key) {
+    return key.getColumnQualifier();
+  }
+  
+  protected Key buildKey(Text partition, Text term) {
+    return new Key(partition, (term == null) ? nullText : term);
+  }
+  
+  protected Key buildKey(Text partition, Text term, Text docID) {
+    return new Key(partition, (term == null) ? nullText : term, docID);
+  }
+  
+  protected Key buildFollowingPartitionKey(Key key) {
+    return key.followingKey(PartialKey.ROW);
+  }
+  
+  protected static final Logger log = Logger.getLogger(IntersectingIterator.class);
+  
+  protected static class TermSource {
+    public SortedKeyValueIterator<Key,Value> iter;
+    public Text term;
+    public Collection<ByteSequence> seekColfams;
+    public boolean notFlag;
+    
+    public TermSource(TermSource other) {
+      this.iter = other.iter;
+      this.term = other.term;
+      this.notFlag = other.notFlag;
+      this.seekColfams = other.seekColfams;
+    }
+    
+    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
+      this(iter, term, false);
+    }
+    
+    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
+      this.iter = iter;
+      this.term = term;
+      this.notFlag = notFlag;
+      // The desired column families for this source is the term itself
+      this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
+    }
+    
+    public String getTermString() {
+      return (this.term == null) ? new String("Iterator") : this.term.toString();
+    }
+  }
+  
+  TermSource[] sources;
+  int sourcesCount = 0;
+  
+  Range overallRange;
+  
+  // query-time settings
+  protected Text currentPartition = null;
+  protected Text currentDocID = new Text(emptyByteArray);
+  static final byte[] emptyByteArray = new byte[0];
+  
+  protected Key topKey = null;
+  protected Value value = new Value(emptyByteArray);
+  
+  public IntersectingIterator() {}
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new IntersectingIterator(this, env);
+  }
+  
+  private IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) {
+    if (other.sources != null) {
+      sourcesCount = other.sourcesCount;
+      sources = new TermSource[sourcesCount];
+      for (int i = 0; i < sourcesCount; i++) {
+        sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term);
+      }
+    }
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return topKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    // we don't really care about values
+    return value;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return currentPartition != null;
+  }
+  
+  // precondition: currentRow is not null
+  private boolean seekOneSource(int sourceID) throws IOException {
+    // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ)
+    // advance the cursor if this source goes beyond it
+    // return whether we advanced the cursor
+    
+    // within this loop progress must be made in one of the following forms:
+    // - currentRow or currentCQ must be increased
+    // - the given source must advance its iterator
+    // this loop will end when any of the following criteria are met
+    // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ)
+    // - the given source is out of data and currentRow is set to null
+    // - the given source has advanced beyond the endRow and currentRow is set to null
+    boolean advancedCursor = false;
+    
+    if (sources[sourceID].notFlag) {
+      while (true) {
+        if (sources[sourceID].iter.hasTop() == false) {
+          // an empty column that you are negating is a valid condition
+          break;
+        }
+        // check if we're past the end key
+        int endCompare = -1;
+        // we should compare the row to the end of the range
+        if (overallRange.getEndKey() != null) {
+          endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
+          if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
+            // an empty column that you are negating is a valid condition
+            break;
+          }
+        }
+        int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
+        // check if this source is already at or beyond currentRow
+        // if not, then seek to at least the current row
+        
+        if (partitionCompare > 0) {
+          // seek to at least the currentRow
+          Key seekKey = buildKey(currentPartition, sources[sourceID].term);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+          continue;
+        }
+        // check if this source has gone beyond currentRow
+        // if so, this is a valid condition for negation
+        if (partitionCompare < 0) {
+          break;
+        }
+        // we have verified that the current source is positioned in currentRow
+        // now we must make sure we're in the right columnFamily in the current row
+        // Note: Iterators are auto-magically set to the correct columnFamily
+        if (sources[sourceID].term != null) {
+          int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
+          // check if this source is already on the right columnFamily
+          // if not, then seek forwards to the right columnFamily
+          if (termCompare > 0) {
+            Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+            continue;
+          }
+          // check if this source is beyond the right columnFamily
+          // if so, then this is a valid condition for negating
+          if (termCompare < 0) {
+            break;
+          }
+        }
+        
+        // we have verified that we are in currentRow and the correct column family
+        // make sure we are at or beyond columnQualifier
+        Text docID = getDocID(sources[sourceID].iter.getTopKey());
+        int docIDCompare = currentDocID.compareTo(docID);
+        // If we are past the target, this is a valid result
+        if (docIDCompare < 0) {
+          break;
+        }
+        // if this source is not yet at the currentCQ then advance in this source
+        if (docIDCompare > 0) {
+          // seek forwards
+          Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+          continue;
+        }
+        // if we are equal to the target, this is an invalid result.
+        // Force the entire process to go to the next row.
+        // We are advancing column 0 because we forced that column to not contain a !
+        // when we did the init()
+        if (docIDCompare == 0) {
+          sources[0].iter.next();
+          advancedCursor = true;
+          break;
+        }
+      }
+    } else {
+      while (true) {
+        if (sources[sourceID].iter.hasTop() == false) {
+          currentPartition = null;
+          // setting currentRow to null counts as advancing the cursor
+          return true;
+        }
+        // check if we're past the end key
+        int endCompare = -1;
+        // we should compare the row to the end of the range
+        
+        if (overallRange.getEndKey() != null) {
+          endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
+          if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
+            currentPartition = null;
+            // setting currentRow to null counts as advancing the cursor
+            return true;
+          }
+        }
+        int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
+        // check if this source is already at or beyond currentRow
+        // if not, then seek to at least the current row
+        if (partitionCompare > 0) {
+          // seek to at least the currentRow
+          Key seekKey = buildKey(currentPartition, sources[sourceID].term);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+          continue;
+        }
+        // check if this source has gone beyond currentRow
+        // if so, advance currentRow
+        if (partitionCompare < 0) {
+          currentPartition.set(getPartition(sources[sourceID].iter.getTopKey()));
+          currentDocID.set(emptyByteArray);
+          advancedCursor = true;
+          continue;
+        }
+        // we have verified that the current source is positioned in currentRow
+        // now we must make sure we're in the right columnFamily in the current row
+        // Note: Iterators are auto-magically set to the correct columnFamily
+        
+        if (sources[sourceID].term != null) {
+          int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
+          // check if this source is already on the right columnFamily
+          // if not, then seek forwards to the right columnFamily
+          if (termCompare > 0) {
+            Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+            continue;
+          }
+          // check if this source is beyond the right columnFamily
+          // if so, then seek to the next row
+          if (termCompare < 0) {
+            // we're out of entries in the current row, so seek to the next one
+            // byte[] currentRowBytes = currentRow.getBytes();
+            // byte[] nextRow = new byte[currentRowBytes.length + 1];
+            // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length);
+            // nextRow[currentRowBytes.length] = (byte)0;
+            // // we should reuse text objects here
+            // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID]));
+            if (endCompare == 0) {
+              // we're done
+              currentPartition = null;
+              // setting currentRow to null counts as advancing the cursor
+              return true;
+            }
+            Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
+            sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+            continue;
+          }
+        }
+        // we have verified that we are in currentRow and the correct column family
+        // make sure we are at or beyond columnQualifier
+        Text docID = getDocID(sources[sourceID].iter.getTopKey());
+        int docIDCompare = currentDocID.compareTo(docID);
+        // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
+        if (docIDCompare < 0) {
+          currentDocID.set(docID);
+          advancedCursor = true;
+          break;
+        }
+        // if this source is not yet at the currentCQ then seek in this source
+        if (docIDCompare > 0) {
+          // seek forwards
+          Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+          sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+          continue;
+        }
+        // this source is at the current row, in its column family, and at currentCQ
+        break;
+      }
+    }
+    return advancedCursor;
+  }
+  
+  @Override
+  public void next() throws IOException {
+    if (currentPartition == null) {
+      return;
+    }
+    // precondition: the current row is set up and the sources all have the same column qualifier
+    // while we don't have a match, seek in the source with the smallest column qualifier
+    sources[0].iter.next();
+    advanceToIntersection();
+  }
+  
+  protected void advanceToIntersection() throws IOException {
+    boolean cursorChanged = true;
+    while (cursorChanged) {
+      // seek all of the sources to at least the highest seen column qualifier in the current row
+      cursorChanged = false;
+      for (int i = 0; i < sourcesCount; i++) {
+        if (currentPartition == null) {
+          topKey = null;
+          return;
+        }
+        if (seekOneSource(i)) {
+          cursorChanged = true;
+          break;
+        }
+      }
+    }
+    topKey = buildKey(currentPartition, nullText, currentDocID);
+  }
+  
+  public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
+    if (iter.hasTop())
+      return iter.getTopKey().toString();
+    return "";
+  }
+  
+  private static final String columnFamiliesOptionName = "columnFamilies";
+  private static final String notFlagOptionName = "notFlag";
+  
+  /**
+   * @param columns
+   * @return encoded columns
+   */
+  protected static String encodeColumns(Text[] columns) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < columns.length; i++) {
+      sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
+      sb.append('\n');
+    }
+    return sb.toString();
+  }
+  
+  /**
+   * @param flags
+   * @return encoded flags
+   */
+  protected static String encodeBooleans(boolean[] flags) {
+    byte[] bytes = new byte[flags.length];
+    for (int i = 0; i < flags.length; i++) {
+      if (flags[i])
+        bytes[i] = 1;
+      else
+        bytes[i] = 0;
+    }
+    return new String(Base64.encodeBase64(bytes));
+  }
+  
+  protected static Text[] decodeColumns(String columns) {
+    String[] columnStrings = columns.split("\n");
+    Text[] columnTexts = new Text[columnStrings.length];
+    for (int i = 0; i < columnStrings.length; i++) {
+      columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
+    }
+    return columnTexts;
+  }
+  
+  /**
+   * @param flags
+   * @return decoded flags
+   */
+  protected static boolean[] decodeBooleans(String flags) {
+    // return null of there were no flags
+    if (flags == null)
+      return null;
+    
+    byte[] bytes = Base64.decodeBase64(flags.getBytes());
+    boolean[] bFlags = new boolean[bytes.length];
+    for (int i = 0; i < bytes.length; i++) {
+      if (bytes[i] == 1)
+        bFlags[i] = true;
+      else
+        bFlags[i] = false;
+    }
+    return bFlags;
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    Text[] terms = decodeColumns(options.get(columnFamiliesOptionName));
+    boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName));
+    
+    if (terms.length < 2) {
+      throw new IllegalArgumentException("IntersectionIterator requires two or more columns families");
+    }
+    
+    // Scan the not flags.
+    // There must be at least one term that isn't negated
+    // And we are going to re-order such that the first term is not a ! term
+    if (notFlag == null) {
+      notFlag = new boolean[terms.length];
+      for (int i = 0; i < terms.length; i++)
+        notFlag[i] = false;
+    }
+    if (notFlag[0]) {
+      for (int i = 1; i < notFlag.length; i++) {
+        if (notFlag[i] == false) {
+          Text swapFamily = new Text(terms[0]);
+          terms[0].set(terms[i]);
+          terms[i].set(swapFamily);
+          notFlag[0] = false;
+          notFlag[i] = true;
+          break;
+        }
+      }
+      if (notFlag[0]) {
+        throw new IllegalArgumentException("IntersectionIterator requires at lest one column family without not");
+      }
+    }
+    
+    sources = new TermSource[terms.length];
+    sources[0] = new TermSource(source, terms[0]);
+    for (int i = 1; i < terms.length; i++) {
+      sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]);
+    }
+    sourcesCount = terms.length;
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+    overallRange = new Range(range);
+    currentPartition = new Text();
+    currentDocID.set(emptyByteArray);
+    
+    
+    // seek each of the sources to the right column family within the row given by key
+    for (int i = 0; i < sourcesCount; i++) {
+      Key sourceKey;
+      if (range.getStartKey() != null) {
+        if (range.getStartKey().getColumnQualifier() != null) {
+          sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term, range.getStartKey().getColumnQualifier());
+        } else {
+          sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
+        }
+        // Seek only to the term for this source as a column family
+        sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
+      } else {
+        // Seek only to the term for this source as a column family
+        sources[i].iter.seek(range, sources[i].seekColfams, true);
+      }
+    }
+    advanceToIntersection();
+  }
+  
+  public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
+    // Check if we have space for the added Source
+    if (sources == null) {
+      sources = new TermSource[1];
+    } else {
+      // allocate space for node, and copy current tree.
+      // TODO: Should we change this to an ArrayList so that we can just add() ?
+      TermSource[] localSources = new TermSource[sources.length + 1];
+      int currSource = 0;
+      for (TermSource myTerm : sources) {
+        // TODO: Do I need to call new here? or can I just re-use the term?
+        localSources[currSource] = new TermSource(myTerm);
+        currSource++;
+      }
+      sources = localSources;
+    }
+    sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag);
+    sourcesCount++;
+  }
+  
+  /**
+   * Encode the columns to be used when iterating.
+   * 
+   * @param cfg
+   * @param columns
+   */
+  public static void setColumnFamilies(IteratorSetting cfg, Text[] columns) {
+    if (columns.length < 2)
+      throw new IllegalArgumentException("Must supply at least two terms to intersect");
+    cfg.addOption(IntersectingIterator.columnFamiliesOptionName, IntersectingIterator.encodeColumns(columns));
+  }
+  
+  /**
+   * Encode columns and NOT flags indicating which columns should be negated (docIDs will be excluded if matching negated columns, instead of included).
+   * 
+   * @param cfg
+   * @param columns
+   * @param notFlags
+   */
+  public static void setColumnFamilies(IteratorSetting cfg, Text[] columns, boolean[] notFlags) {
+    if (columns.length < 2)
+      throw new IllegalArgumentException("Must supply at least two terms to intersect");
+    if (columns.length != notFlags.length)
+      throw new IllegalArgumentException("columns and notFlags arrays must be the same length");
+    setColumnFamilies(cfg, columns);
+    cfg.addOption(IntersectingIterator.notFlagOptionName, IntersectingIterator.encodeBooleans(notFlags));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/LargeRowFilter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/LargeRowFilter.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/LargeRowFilter.java
new file mode 100644
index 0000000..f0c5932
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/LargeRowFilter.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This iterator suppresses rows that exceed a specified number of columns. Once a row exceeds the threshold, a marker is emitted and the row is always
+ * suppressed by this iterator after that point in time.
+ * 
+ * This iterator works in a similar way to the RowDeletingIterator. See its javadoc about locality groups.
+ */
+public class LargeRowFilter implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  
+  public static final Value SUPPRESS_ROW_VALUE = new Value("SUPPRESS_ROW".getBytes());
+  
+  private static final ByteSequence EMPTY = new ArrayByteSequence(new byte[] {});
+  
+  /* key into hash map, value refers to the row supression limit (maxColumns) */
+  private static final String MAX_COLUMNS = "max_columns";
+  
+  private SortedKeyValueIterator<Key,Value> source;
+  
+  // a cache of keys
+  private ArrayList<Key> keys = new ArrayList<Key>();
+  private ArrayList<Value> values = new ArrayList<Value>();
+  
+  private int currentPosition;
+  
+  private int maxColumns;
+  
+  private boolean propogateSuppression = false;
+  
+  private Range range;
+  private Collection<ByteSequence> columnFamilies;
+  private boolean inclusive;
+  private boolean dropEmptyColFams;
+  
+  private boolean isSuppressionMarker(Key key, Value val) {
+    return key.getColumnFamilyData().length() == 0 && key.getColumnQualifierData().length() == 0 && key.getColumnVisibilityData().length() == 0
+        && val.equals(SUPPRESS_ROW_VALUE);
+  }
+  
+  private void reseek(Key key) throws IOException {
+    if (range.afterEndKey(key)) {
+      range = new Range(range.getEndKey(), true, range.getEndKey(), range.isEndKeyInclusive());
+      source.seek(range, columnFamilies, inclusive);
+    } else {
+      range = new Range(key, true, range.getEndKey(), range.isEndKeyInclusive());
+      source.seek(range, columnFamilies, inclusive);
+    }
+  }
+  
+  private void consumeRow(ByteSequence row) throws IOException {
+    // try reading a few and if still not to next row, then seek
+    int count = 0;
+    
+    while (source.hasTop() && source.getTopKey().getRowData().equals(row)) {
+      source.next();
+      count++;
+      if (count >= 10) {
+        Key nextRowStart = new Key(new Text(row.toArray())).followingKey(PartialKey.ROW);
+        reseek(nextRowStart);
+        count = 0;
+      }
+    }
+  }
+  
+  private void addKeyValue(Key k, Value v) {
+    if (dropEmptyColFams && k.getColumnFamilyData().equals(EMPTY)) {
+      return;
+    }
+    keys.add(new Key(k));
+    values.add(new Value(v));
+  }
+  
+  private void bufferNextRow() throws IOException {
+    
+    keys.clear();
+    values.clear();
+    currentPosition = 0;
+    
+    while (source.hasTop() && keys.size() == 0) {
+      
+      addKeyValue(source.getTopKey(), source.getTopValue());
+      
+      if (isSuppressionMarker(source.getTopKey(), source.getTopValue())) {
+        
+        consumeRow(source.getTopKey().getRowData());
+        
+      } else {
+        
+        ByteSequence currentRow = keys.get(0).getRowData();
+        source.next();
+        
+        while (source.hasTop() && source.getTopKey().getRowData().equals(currentRow)) {
+          
+          addKeyValue(source.getTopKey(), source.getTopValue());
+          
+          if (keys.size() > maxColumns) {
+            keys.clear();
+            values.clear();
+            
+            // when the row is to big, just emit a suppression
+            // marker
+            addKeyValue(new Key(new Text(currentRow.toArray())), SUPPRESS_ROW_VALUE);
+            consumeRow(currentRow);
+          } else {
+            source.next();
+          }
+        }
+      }
+      
+    }
+  }
+  
+  private void readNextRow() throws IOException {
+    
+    bufferNextRow();
+    
+    while (!propogateSuppression && currentPosition < keys.size() && isSuppressionMarker(keys.get(0), values.get(0))) {
+      bufferNextRow();
+    }
+  }
+  
+  private LargeRowFilter(SortedKeyValueIterator<Key,Value> source, boolean propogateSuppression, int maxColumns) {
+    this.source = source;
+    this.propogateSuppression = propogateSuppression;
+    this.maxColumns = maxColumns;
+  }
+  
+  public LargeRowFilter() {}
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    this.source = source;
+    this.maxColumns = Integer.parseInt(options.get(MAX_COLUMNS));
+    this.propogateSuppression = env.getIteratorScope() != IteratorScope.scan;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return currentPosition < keys.size();
+  }
+  
+  @Override
+  public void next() throws IOException {
+    
+    if (currentPosition >= keys.size()) {
+      throw new IllegalStateException("Called next() when hasTop() is false");
+    }
+    
+    currentPosition++;
+    
+    if (currentPosition == keys.size()) {
+      readNextRow();
+    }
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    
+    if (inclusive && !columnFamilies.contains(EMPTY)) {
+      columnFamilies = new HashSet<ByteSequence>(columnFamilies);
+      columnFamilies.add(EMPTY);
+      dropEmptyColFams = true;
+    } else if (!inclusive && columnFamilies.contains(EMPTY)) {
+      columnFamilies = new HashSet<ByteSequence>(columnFamilies);
+      columnFamilies.remove(EMPTY);
+      dropEmptyColFams = true;
+    } else {
+      dropEmptyColFams = false;
+    }
+    
+    this.range = range;
+    this.columnFamilies = columnFamilies;
+    this.inclusive = inclusive;
+    
+    if (range.getStartKey() != null) {
+      // seek to beginning of row to see if there is a suppression marker
+      Range newRange = new Range(new Key(range.getStartKey().getRow()), true, range.getEndKey(), range.isEndKeyInclusive());
+      source.seek(newRange, columnFamilies, inclusive);
+      
+      readNextRow();
+      
+      // it is possible that all or some of the data read for the current
+      // row is before the start of the range
+      while (currentPosition < keys.size() && range.beforeStartKey(keys.get(currentPosition)))
+        currentPosition++;
+      
+      if (currentPosition == keys.size())
+        readNextRow();
+      
+    } else {
+      source.seek(range, columnFamilies, inclusive);
+      readNextRow();
+    }
+    
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return keys.get(currentPosition);
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return values.get(currentPosition);
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new LargeRowFilter(source.deepCopy(env), propogateSuppression, maxColumns);
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    String description = "This iterator suppresses rows that exceed a specified number of columns. Once\n"
+        + "a row exceeds the threshold, a marker is emitted and the row is always\n" + "suppressed by this iterator after that point in time.\n"
+        + " This iterator works in a similar way to the RowDeletingIterator. See its\n" + " javadoc about locality groups.\n";
+    return new IteratorOptions(this.getClass().getSimpleName(), description, Collections.singletonMap(MAX_COLUMNS, "Number Of Columns To Begin Suppression"),
+        null);
+  }
+  
+  @Override
+  public boolean validateOptions(Map<String,String> options) {
+    if (options == null || options.size() < 1) {
+      System.out.println("Bad # of options, must supply: " + MAX_COLUMNS + " as value");
+      return false;
+    }
+    
+    if (options.containsKey(MAX_COLUMNS)) {
+      try {
+        maxColumns = Integer.parseInt(options.get(MAX_COLUMNS));
+      } catch (NumberFormatException e) {
+        e.printStackTrace();
+        return false;
+      }
+    } else {
+      System.out.println("Need to have " + MAX_COLUMNS);
+      return false;
+    }
+    
+    return true;
+  }
+  
+  /**
+   * A convenience method for setting the maximum number of columns to keep.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param maxColumns
+   *          number of columns to keep.
+   */
+  public static void setMaxColumns(IteratorSetting is, int maxColumns) {
+    is.addOption(MAX_COLUMNS, Integer.toString(maxColumns));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MaxCombiner.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MaxCombiner.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MaxCombiner.java
new file mode 100644
index 0000000..1b5a4e2
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MaxCombiner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.iterators.LongCombiner;
+
+/**
+ * A Combiner that interprets Values as Longs and returns the largest Long among them.
+ */
+public class MaxCombiner extends LongCombiner {
+  @Override
+  public Long typedReduce(Key key, Iterator<Long> iter) {
+    long max = Long.MIN_VALUE;
+    while (iter.hasNext()) {
+      Long l = iter.next();
+      if (l > max)
+        max = l;
+    }
+    return max;
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    IteratorOptions io = super.describeOptions();
+    io.setName("max");
+    io.setDescription("MaxCombiner interprets Values as Longs and finds their maximum.  A variety of encodings (variable length, fixed length, or string) are available");
+    return io;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MinCombiner.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MinCombiner.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MinCombiner.java
new file mode 100644
index 0000000..891806e
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/MinCombiner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.iterators.LongCombiner;
+
+/**
+ * A Combiner that interprets Values as Longs and returns the smallest Long among them.
+ */
+public class MinCombiner extends LongCombiner {
+  @Override
+  public Long typedReduce(Key key, Iterator<Long> iter) {
+    long min = Long.MAX_VALUE;
+    while (iter.hasNext()) {
+      Long l = iter.next();
+      if (l < min)
+        min = l;
+    }
+    return min;
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    IteratorOptions io = super.describeOptions();
+    io.setName("min");
+    io.setDescription("MinCombiner interprets Values as Longs and finds their minimum.  A variety of encodings (variable length, fixed length, or string) are available");
+    return io;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RegExFilter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RegExFilter.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RegExFilter.java
new file mode 100644
index 0000000..e508631
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RegExFilter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * A Filter that matches entries based on Java regular expressions.
+ */
+public class RegExFilter extends Filter {
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    RegExFilter result = (RegExFilter) super.deepCopy(env);
+    result.rowMatcher = copyMatcher(rowMatcher);
+    result.colfMatcher = copyMatcher(colfMatcher);
+    result.colqMatcher = copyMatcher(colqMatcher);
+    result.valueMatcher = copyMatcher(valueMatcher);
+    result.orFields = orFields;
+    return result;
+  }
+  
+  public static final String ROW_REGEX = "rowRegex";
+  public static final String COLF_REGEX = "colfRegex";
+  public static final String COLQ_REGEX = "colqRegex";
+  public static final String VALUE_REGEX = "valueRegex";
+  public static final String OR_FIELDS = "orFields";
+  public static final String ENCODING = "encoding";
+  
+  public static final String ENCODING_DEFAULT = "UTF-8";
+  
+  private Matcher rowMatcher;
+  private Matcher colfMatcher;
+  private Matcher colqMatcher;
+  private Matcher valueMatcher;
+  private boolean orFields = false;
+  
+  private String encoding = ENCODING_DEFAULT;
+  
+  private Matcher copyMatcher(Matcher m) {
+    if (m == null)
+      return m;
+    else
+      return m.pattern().matcher("");
+  }
+  
+  private boolean matches(Matcher matcher, ByteSequence bs) {
+    if (matcher != null) {
+      try {
+        matcher.reset(new String(bs.getBackingArray(), bs.offset(), bs.length(), encoding));
+        return matcher.matches();
+      } catch (UnsupportedEncodingException e) {
+        e.printStackTrace();
+      }
+    }
+    return !orFields;
+  }
+  
+  private boolean matches(Matcher matcher, byte data[], int offset, int len) {
+    if (matcher != null) {
+      try {
+        matcher.reset(new String(data, offset, len, encoding));
+        return matcher.matches();
+      } catch (UnsupportedEncodingException e) {
+        e.printStackTrace();
+      }
+    }
+    return !orFields;
+  }
+  
+  @Override
+  public boolean accept(Key key, Value value) {
+    if (orFields)
+      return matches(rowMatcher, key.getRowData()) || matches(colfMatcher, key.getColumnFamilyData()) || matches(colqMatcher, key.getColumnQualifierData())
+          || matches(valueMatcher, value.get(), 0, value.get().length);
+    return matches(rowMatcher, key.getRowData()) && matches(colfMatcher, key.getColumnFamilyData()) && matches(colqMatcher, key.getColumnQualifierData())
+        && matches(valueMatcher, value.get(), 0, value.get().length);
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    if (options.containsKey(ROW_REGEX)) {
+      rowMatcher = Pattern.compile(options.get(ROW_REGEX)).matcher("");
+    } else {
+      rowMatcher = null;
+    }
+    
+    if (options.containsKey(COLF_REGEX)) {
+      colfMatcher = Pattern.compile(options.get(COLF_REGEX)).matcher("");
+    } else {
+      colfMatcher = null;
+    }
+    
+    if (options.containsKey(COLQ_REGEX)) {
+      colqMatcher = Pattern.compile(options.get(COLQ_REGEX)).matcher("");
+    } else {
+      colqMatcher = null;
+    }
+    
+    if (options.containsKey(VALUE_REGEX)) {
+      valueMatcher = Pattern.compile(options.get(VALUE_REGEX)).matcher("");
+    } else {
+      valueMatcher = null;
+    }
+    
+    if (options.containsKey(OR_FIELDS)) {
+      orFields = Boolean.parseBoolean(options.get(OR_FIELDS));
+    } else {
+      orFields = false;
+    }
+    
+    if (options.containsKey(ENCODING)) {
+      encoding = options.get(ENCODING);
+    }
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    IteratorOptions io = super.describeOptions();
+    io.setName("regex");
+    io.setDescription("The RegExFilter/Iterator allows you to filter for key/value pairs based on regular expressions");
+    io.addNamedOption(RegExFilter.ROW_REGEX, "regular expression on row");
+    io.addNamedOption(RegExFilter.COLF_REGEX, "regular expression on column family");
+    io.addNamedOption(RegExFilter.COLQ_REGEX, "regular expression on column qualifier");
+    io.addNamedOption(RegExFilter.VALUE_REGEX, "regular expression on value");
+    io.addNamedOption(RegExFilter.OR_FIELDS, "use OR instread of AND when multiple regexes given");
+    io.addNamedOption(RegExFilter.ENCODING, "character encoding of byte array value (default is " + ENCODING_DEFAULT + ")");
+    return io;
+  }
+  
+  @Override
+  public boolean validateOptions(Map<String,String> options) {
+    super.validateOptions(options);
+    if (options.containsKey(ROW_REGEX))
+      Pattern.compile(options.get(ROW_REGEX)).matcher("");
+    
+    if (options.containsKey(COLF_REGEX))
+      Pattern.compile(options.get(COLF_REGEX)).matcher("");
+    
+    if (options.containsKey(COLQ_REGEX))
+      Pattern.compile(options.get(COLQ_REGEX)).matcher("");
+    
+    if (options.containsKey(VALUE_REGEX))
+      Pattern.compile(options.get(VALUE_REGEX)).matcher("");
+    
+    if (options.containsKey(ENCODING)) {
+      try {
+        this.encoding = options.get(ENCODING);
+        @SuppressWarnings("unused")
+        String test = new String("test".getBytes(), encoding);
+      } catch (UnsupportedEncodingException e) {
+        e.printStackTrace();
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  /**
+   * Encode the terms to match against in the iterator
+   * 
+   * @param si
+   *          ScanIterator config to be updated
+   * @param rowTerm
+   *          the pattern to match against the Key's row. Not used if null.
+   * @param cfTerm
+   *          the pattern to match against the Key's column family. Not used if null.
+   * @param cqTerm
+   *          the pattern to match against the Key's column qualifier. Not used if null.
+   * @param valueTerm
+   *          the pattern to match against the Key's value. Not used if null.
+   * @param orFields
+   *          if true, any of the non-null terms can match to return the entry
+   */
+  public static void setRegexs(IteratorSetting si, String rowTerm, String cfTerm, String cqTerm, String valueTerm, boolean orFields) {
+    if (rowTerm != null)
+      si.addOption(RegExFilter.ROW_REGEX, rowTerm);
+    if (cfTerm != null)
+      si.addOption(RegExFilter.COLF_REGEX, cfTerm);
+    if (cqTerm != null)
+      si.addOption(RegExFilter.COLQ_REGEX, cqTerm);
+    if (valueTerm != null)
+      si.addOption(RegExFilter.VALUE_REGEX, valueTerm);
+    if (orFields) {
+      si.addOption(RegExFilter.OR_FIELDS, "true");
+    }
+  }
+  
+  /**
+   * Set the encoding string to use when interpreting characters
+   * 
+   * @param si
+   *          ScanIterator config to be updated
+   * @param encoding
+   *          the encoding string to use for character interpretation.
+   * 
+   */
+  public static void setEncoding(IteratorSetting si, String encoding) {
+    if (!encoding.isEmpty()) {
+      si.addOption(RegExFilter.ENCODING, encoding);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/ReqVisFilter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/ReqVisFilter.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/ReqVisFilter.java
new file mode 100644
index 0000000..754b2c3
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/ReqVisFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.security.ColumnVisibility;
+
+/**
+ * A Filter that matches entries with a non-empty ColumnVisibility.
+ */
+public class ReqVisFilter extends Filter {
+  
+  @Override
+  public boolean accept(Key k, Value v) {
+    ColumnVisibility vis = new ColumnVisibility(k.getColumnVisibility());
+    return vis.getExpression().length > 0;
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    IteratorOptions io = super.describeOptions();
+    io.setName("reqvis");
+    io.setDescription("ReqVisFilter hides entries without a visibility label");
+    return io;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowDeletingIterator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowDeletingIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowDeletingIterator.java
new file mode 100644
index 0000000..b139096
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowDeletingIterator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * An iterator for deleting whole rows.
+ * 
+ * After setting this iterator up for your table, to delete a row insert a row with empty column family, empty column qualifier, empty column visibility, and a
+ * value of DEL_ROW. Do not use empty columns for anything else when using this iterator.
+ * 
+ * When using this iterator the locality group containing the row deletes will always be read. The locality group containing the empty column family will
+ * contain row deletes. Always reading this locality group can have an impact on performance.
+ * 
+ * For example assume there are two locality groups, one containing large images and one containing small metadata about the images. If row deletes are in the
+ * same locality group as the images, then this will significantly slow down scans and major compactions that are only reading the metadata locality group.
+ * Therefore, you would want to put the empty column family in the locality group that contains the metadata. Another option is to put the empty column in its
+ * own locality group. Which is best depends on your data.
+ * 
+ */
+
+public class RowDeletingIterator implements SortedKeyValueIterator<Key,Value> {
+  
+  public static final Value DELETE_ROW_VALUE = new Value("DEL_ROW".getBytes());
+  private SortedKeyValueIterator<Key,Value> source;
+  private boolean propogateDeletes;
+  private ByteSequence currentRow;
+  private boolean currentRowDeleted;
+  private long deleteTS;
+  
+  private boolean dropEmptyColFams;
+  
+  private static final ByteSequence EMPTY = new ArrayByteSequence(new byte[] {});
+  
+  private RowDeletingIterator(SortedKeyValueIterator<Key,Value> source, boolean propogateDeletes2) {
+    this.source = source;
+    this.propogateDeletes = propogateDeletes2;
+  }
+  
+  public RowDeletingIterator() {}
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new RowDeletingIterator(source.deepCopy(env), propogateDeletes);
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return source.getTopKey();
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return source.getTopValue();
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return source.hasTop();
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    this.source = source;
+    this.propogateDeletes = (env.getIteratorScope() == IteratorScope.majc && !env.isFullMajorCompaction()) || env.getIteratorScope() == IteratorScope.minc;
+  }
+  
+  @Override
+  public void next() throws IOException {
+    source.next();
+    consumeDeleted();
+    consumeEmptyColFams();
+  }
+  
+  private void consumeEmptyColFams() throws IOException {
+    while (dropEmptyColFams && source.hasTop() && source.getTopKey().getColumnFamilyData().length() == 0) {
+      source.next();
+      consumeDeleted();
+    }
+  }
+  
+  private boolean isDeleteMarker(Key key, Value val) {
+    return key.getColumnFamilyData().length() == 0 && key.getColumnQualifierData().length() == 0 && key.getColumnVisibilityData().length() == 0
+        && val.equals(DELETE_ROW_VALUE);
+  }
+  
+  private void consumeDeleted() throws IOException {
+    // this method tries to do as little work as possible when nothing is deleted
+    while (source.hasTop()) {
+      if (currentRowDeleted) {
+        while (source.hasTop() && currentRow.equals(source.getTopKey().getRowData()) && source.getTopKey().getTimestamp() <= deleteTS) {
+          source.next();
+        }
+        
+        if (source.hasTop() && !currentRow.equals(source.getTopKey().getRowData())) {
+          currentRowDeleted = false;
+        }
+      }
+      
+      if (!currentRowDeleted && source.hasTop() && isDeleteMarker(source.getTopKey(), source.getTopValue())) {
+        currentRow = source.getTopKey().getRowData();
+        currentRowDeleted = true;
+        deleteTS = source.getTopKey().getTimestamp();
+        
+        if (propogateDeletes)
+          break;
+      } else {
+        break;
+      }
+    }
+    
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    
+    if (inclusive && !columnFamilies.contains(EMPTY)) {
+      columnFamilies = new HashSet<ByteSequence>(columnFamilies);
+      columnFamilies.add(EMPTY);
+      dropEmptyColFams = true;
+    } else if (!inclusive && columnFamilies.contains(EMPTY)) {
+      columnFamilies = new HashSet<ByteSequence>(columnFamilies);
+      columnFamilies.remove(EMPTY);
+      dropEmptyColFams = true;
+    } else {
+      dropEmptyColFams = false;
+    }
+    
+    currentRowDeleted = false;
+    
+    if (range.getStartKey() != null) {
+      // seek to beginning of row
+      Range newRange = new Range(new Key(range.getStartKey().getRow()), true, range.getEndKey(), range.isEndKeyInclusive());
+      source.seek(newRange, columnFamilies, inclusive);
+      consumeDeleted();
+      consumeEmptyColFams();
+      
+      if (source.hasTop() && range.beforeStartKey(source.getTopKey())) {
+        source.seek(range, columnFamilies, inclusive);
+        consumeDeleted();
+        consumeEmptyColFams();
+      }
+    } else {
+      source.seek(range, columnFamilies, inclusive);
+      consumeDeleted();
+      consumeEmptyColFams();
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
new file mode 100644
index 0000000..1811734
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This iterator makes it easy to select rows that meet a given criteria. Its an alternative to the {@link WholeRowIterator}. There are a few things to consider
+ * when deciding which one to use.
+ * 
+ * First the WholeRowIterator requires that the row fit in memory and that the entire row is read before a decision is made. This iterator has neither
+ * requirement, it allows seeking within a row to avoid reading the entire row to make a decision. So even if your rows fit into memory, this extending this
+ * iterator may be better choice because you can seek.
+ * 
+ * Second the WholeRowIterator is currently the only way to achieve row isolation with the {@link BatchScanner}. With the normal {@link Scanner} row isolation
+ * can be enabled and this Iterator may be used.
+ * 
+ * Third the row acceptance test will be executed every time this Iterator is seeked. If the row is large, then the row will fetched in batches of key/values.
+ * As each batch is fetched the test may be re-executed because the iterator stack is reseeked for each batch. The batch size may be increased to reduce the
+ * number of times the test is executed. With the normal Scanner, if isolation is enabled then it will read an entire row w/o seeking this iterator.
+ * 
+ */
+public abstract class RowFilter extends WrappingIterator {
+  
+  private RowIterator decisionIterator;
+  private Collection<ByteSequence> columnFamilies;
+  Text currentRow;
+  private boolean inclusive;
+  private Range range;
+  private boolean hasTop;
+
+  private static class RowIterator extends WrappingIterator {
+    private Range rowRange;
+    private boolean hasTop;
+    
+    RowIterator(SortedKeyValueIterator<Key,Value> source) {
+      super.setSource(source);
+    }
+    
+    void setRow(Range row) {
+      this.rowRange = row;
+    }
+    
+    @Override
+    public boolean hasTop() {
+      return hasTop && super.hasTop();
+    }
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+      
+      range = rowRange.clip(range, true);
+      if (range == null) {
+        hasTop = false;
+      } else {
+        hasTop = true;
+        super.seek(range, columnFamilies, inclusive);
+      }
+    }
+  }
+
+  private void skipRows() throws IOException {
+    SortedKeyValueIterator<Key,Value> source = getSource();
+    while (source.hasTop()) {
+      Text row = source.getTopKey().getRow();
+      
+      if (currentRow != null && currentRow.equals(row))
+        break;
+      
+      Range rowRange = new Range(row);
+      decisionIterator.setRow(rowRange);
+      decisionIterator.seek(rowRange, columnFamilies, inclusive);
+      
+      if (acceptRow(decisionIterator)) {
+        currentRow = row;
+        break;
+      } else {
+        currentRow = null;
+        int count = 0;
+        while (source.hasTop() && count < 10 && source.getTopKey().getRow().equals(row)) {
+          count++;
+          source.next();
+        }
+        
+        if (source.hasTop() && source.getTopKey().getRow().equals(row)) {
+          Range nextRow = new Range(row, false, null, false);
+          nextRow = range.clip(nextRow, true);
+          if (nextRow == null)
+            hasTop = false;
+          else
+            source.seek(nextRow, columnFamilies, inclusive);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Implementation should return false to suppress a row.
+   * 
+   * 
+   * @param rowIterator
+   *          - An iterator over the row. This iterator is confined to the row. Seeking past the end of the row will return no data. Seeking before the row will
+   *          always set top to the first column in the current row. By default this iterator will only see the columns the parent was seeked with. To see more
+   *          columns reseek this iterator with those columns.
+   * @return false if a row should be suppressed, otherwise true.
+   * @throws IOException
+   */
+  public abstract boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) throws IOException;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    this.decisionIterator = new RowIterator(source.deepCopy(env));
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return hasTop && super.hasTop();
+  }
+  
+  @Override
+  public void next() throws IOException {
+    super.next();
+    skipRows();
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    super.seek(range, columnFamilies, inclusive);
+    this.columnFamilies = columnFamilies;
+    this.inclusive = inclusive;
+    this.range = range;
+    currentRow = null;
+    hasTop = true;
+    skipRows();
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingArrayCombiner.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingArrayCombiner.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingArrayCombiner.java
new file mode 100644
index 0000000..236b42e
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingArrayCombiner.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.LongCombiner;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.TypedValueCombiner;
+import org.apache.accumulo.core.iterators.ValueFormatException;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A Combiner that interprets Values as arrays of Longs and returns an array of element-wise sums.
+ */
+public class SummingArrayCombiner extends TypedValueCombiner<List<Long>> {
+  public static final Encoder<List<Long>> FIXED_LONG_ARRAY_ENCODER = new FixedLongArrayEncoder();
+  public static final Encoder<List<Long>> VAR_LONG_ARRAY_ENCODER = new VarLongArrayEncoder();
+  public static final Encoder<List<Long>> STRING_ARRAY_ENCODER = new StringArrayEncoder();
+  
+  private static final String TYPE = "type";
+  private static final String CLASS_PREFIX = "class:";
+  
+  public static enum Type {
+    /**
+     * indicates a variable-length encoding of a list of Longs using {@link SummingArrayCombiner.VarLongArrayEncoder}
+     */
+    VARLEN,
+    /**
+     * indicates a fixed-length (8 bytes for each Long) encoding of a list of Longs using {@link SummingArrayCombiner.FixedLongArrayEncoder}
+     */
+    FIXEDLEN,
+    /**
+     * indicates a string (comma-separated) representation of a list of Longs using {@link SummingArrayCombiner.StringArrayEncoder}
+     */
+    STRING
+  }
+  
+  @Override
+  public List<Long> typedReduce(Key key, Iterator<List<Long>> iter) {
+    List<Long> sum = new ArrayList<Long>();
+    while (iter.hasNext()) {
+      sum = arrayAdd(sum, iter.next());
+    }
+    return sum;
+  }
+  
+  public static List<Long> arrayAdd(List<Long> la, List<Long> lb) {
+    if (la.size() > lb.size()) {
+      for (int i = 0; i < lb.size(); i++) {
+        la.set(i, LongCombiner.safeAdd(la.get(i), lb.get(i)));
+      }
+      return la;
+    } else {
+      for (int i = 0; i < la.size(); i++) {
+        lb.set(i, LongCombiner.safeAdd(lb.get(i), la.get(i)));
+      }
+      return lb;
+    }
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    setEncoder(options);
+  }
+  
+  private void setEncoder(Map<String,String> options) {
+    String type = options.get(TYPE);
+    if (type == null)
+      throw new IllegalArgumentException("no type specified");
+    if (type.startsWith(CLASS_PREFIX)) {
+      setEncoder(type.substring(CLASS_PREFIX.length()));
+      testEncoder(Arrays.asList(0l, 1l));
+    } else {
+      switch (Type.valueOf(options.get(TYPE))) {
+        case VARLEN:
+          setEncoder(VAR_LONG_ARRAY_ENCODER);
+          return;
+        case FIXEDLEN:
+          setEncoder(FIXED_LONG_ARRAY_ENCODER);
+          return;
+        case STRING:
+          setEncoder(STRING_ARRAY_ENCODER);
+          return;
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    IteratorOptions io = super.describeOptions();
+    io.setName("sumarray");
+    io.setDescription("SummingArrayCombiner can interpret Values as arrays of Longs using a variety of encodings (arrays of variable length longs or fixed length longs, or comma-separated strings) before summing element-wise.");
+    io.addNamedOption(TYPE, "<VARLEN|FIXEDLEN|STRING|fullClassName>");
+    return io;
+  }
+  
+  @Override
+  public boolean validateOptions(Map<String,String> options) {
+    super.validateOptions(options);
+    setEncoder(options);
+    return true;
+  }
+  
+  public abstract static class DOSArrayEncoder<V> implements Encoder<List<V>> {
+    public abstract void write(DataOutputStream dos, V v) throws IOException;
+    
+    public abstract V read(DataInputStream dis) throws IOException;
+    
+    @Override
+    public byte[] encode(List<V> vl) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      try {
+        WritableUtils.writeVInt(dos, vl.size());
+        for (V v : vl) {
+          write(dos, v);
+        }
+      } catch (IOException e) {
+        throw new NumberFormatException(e.getMessage());
+      }
+      return baos.toByteArray();
+    }
+    
+    @Override
+    public List<V> decode(byte[] b) {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
+      try {
+        int len = WritableUtils.readVInt(dis);
+        List<V> vl = new ArrayList<V>(len);
+        for (int i = 0; i < len; i++) {
+          vl.add(read(dis));
+        }
+        return vl;
+      } catch (IOException e) {
+        throw new ValueFormatException(e);
+      }
+    }
+  }
+  
+  public static class VarLongArrayEncoder extends DOSArrayEncoder<Long> {
+    @Override
+    public void write(DataOutputStream dos, Long v) throws IOException {
+      WritableUtils.writeVLong(dos, v);
+    }
+    
+    @Override
+    public Long read(DataInputStream dis) throws IOException {
+      return WritableUtils.readVLong(dis);
+    }
+  }
+  
+  public static class FixedLongArrayEncoder extends DOSArrayEncoder<Long> {
+    @Override
+    public void write(DataOutputStream dos, Long v) throws IOException {
+      dos.writeLong(v);
+    }
+    
+    @Override
+    public Long read(DataInputStream dis) throws IOException {
+      return dis.readLong();
+    }
+  }
+  
+  public static class StringArrayEncoder implements Encoder<List<Long>> {
+    @Override
+    public byte[] encode(List<Long> la) {
+      if (la.size() == 0)
+        return new byte[] {};
+      StringBuilder sb = new StringBuilder(Long.toString(la.get(0)));
+      for (int i = 1; i < la.size(); i++) {
+        sb.append(",");
+        sb.append(Long.toString(la.get(i)));
+      }
+      return sb.toString().getBytes();
+    }
+    
+    @Override
+    public List<Long> decode(byte[] b) {
+      String[] longstrs = new String(b).split(",");
+      List<Long> la = new ArrayList<Long>(longstrs.length);
+      for (String s : longstrs) {
+        if (s.length() == 0)
+          la.add(0l);
+        else
+          try {
+            la.add(Long.parseLong(s));
+          } catch (NumberFormatException nfe) {
+            throw new ValueFormatException(nfe);
+          }
+      }
+      return la;
+    }
+  }
+  
+  /**
+   * A convenience method for setting the encoding type.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param type
+   *          SummingArrayCombiner.Type specifying the encoding type.
+   */
+  public static void setEncodingType(IteratorSetting is, Type type) {
+    is.addOption(TYPE, type.toString());
+  }
+  
+  /**
+   * A convenience method for setting the encoding type.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param encoderClass
+   *          Class<? extends Encoder<List<Long>>> specifying the encoding type.
+   */
+  public static void setEncodingType(IteratorSetting is, Class<? extends Encoder<List<Long>>> encoderClass) {
+    is.addOption(TYPE, CLASS_PREFIX + encoderClass.getName());
+  }
+  
+  /**
+   * A convenience method for setting the encoding type.
+   * 
+   * @param is
+   *          IteratorSetting object to configure.
+   * @param encoderClassName
+   *          name of a class specifying the encoding type.
+   */
+  public static void setEncodingType(IteratorSetting is, String encoderClassName) {
+    is.addOption(TYPE, CLASS_PREFIX + encoderClassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingCombiner.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingCombiner.java b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingCombiner.java
new file mode 100644
index 0000000..6841bac
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/iterators/user/SummingCombiner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.iterators.LongCombiner;
+
+/**
+ * A Combiner that interprets Values as Longs and returns their sum.
+ */
+public class SummingCombiner extends LongCombiner {
+  @Override
+  public Long typedReduce(Key key, Iterator<Long> iter) {
+    long sum = 0;
+    while (iter.hasNext()) {
+      sum = safeAdd(sum, iter.next());
+    }
+    return sum;
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    IteratorOptions io = super.describeOptions();
+    io.setName("sum");
+    io.setDescription("SummingCombiner interprets Values as Longs and adds them together.  A variety of encodings (variable length, fixed length, or string) are available");
+    return io;
+  }
+}


Mime
View raw message