incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject svn commit: r1210600 [4/16] - in /incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/ ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/ ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/ ingest/src/main/java/protobuf/ ...
Date Mon, 05 Dec 2011 20:05:51 GMT
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package iterator;
 
 import java.io.IOException;
@@ -43,286 +43,280 @@ import com.esotericsoftware.kryo.Kryo;
 
 /**
  * 
- * This iterator aggregates rows together using the specified key comparator. Subclasses will
- * provide their own implementation of fillMap which will fill the supplied EventFields object
- * with field names (key) and field values (value). After all fields have been put into the 
- * aggregated object (by aggregating all columns with the same key), the EventFields object will be compared
- * against the supplied expression. If the expression returns true, then the return key and
- * return value can be retrieved via getTopKey() and getTopValue(). 
+ * This iterator aggregates rows together using the specified key comparator. Subclasses will provide their own implementation of fillMap which will fill the
+ * supplied EventFields object with field names (key) and field values (value). After all fields have been put into the aggregated object (by aggregating all
+ * columns with the same key), the EventFields object will be compared against the supplied expression. If the expression returns true, then the return key and
+ * return value can be retrieved via getTopKey() and getTopValue().
  * 
- * Optionally, the caller can set an expression (field operator value) that should not be evaluated against
- * the event. For example, if the query is "A == 'foo' and B == 'bar'", but for some reason B may not be in
- * the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the events to be
- * evaluated against the remainder of the expression and still return as true.
+ * Optionally, the caller can set an expression (field operator value) that should not be evaluated against the event. For example, if the query is
+ * "A == 'foo' and B == 'bar'", but for some reason B may not be in the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the
+ * events to be evaluated against the remainder of the expression and still return as true.
  * 
- * By default this iterator will return all Events in the shard. If the START_DATE and
- * END_DATE are specified, then this iterator will evaluate the timestamp of the key against
- * the start and end dates. If the event date is not within the range of start to end, then it is skipped.
+ * By default this iterator will return all Events in the shard. If the START_DATE and END_DATE are specified, then this iterator will evaluate the timestamp of
+ * the key against the start and end dates. If the event date is not within the range of start to end, then it is skipped.
  * 
- * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value. 
+ * This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
  * 
  */
-public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key, Value>, OptionDescriber {
-
-    private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
-    protected static final byte[] NULL_BYTE = new byte[0];
-    public static final String QUERY_OPTION = "expr";
-    public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
-    
-    private PartialKey comparator = null;
-    private SortedKeyValueIterator<Key, Value> iterator;
-    private Key currentKey = new Key();
-    private Key returnKey;
-    private Value returnValue;
-    private String expression;
-    private QueryEvaluator evaluator;
-    private EventFields event = null;
-    private static Kryo kryo = new Kryo();
-    private Range seekRange = null;
-    private Set<String> skipExpressions = null;
-
-    protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
-        iterator = other.iterator.deepCopy(env);
-        event = other.event;
-    }
-
-    public AbstractEvaluatingIterator() {
-    }
-
-    /**
-     * Implementations will return the PartialKey value to use for comparing keys for aggregating events
-     *
-     * @return the type of comparator to use
-     */
-    public abstract PartialKey getKeyComparator();
-
-    /**
-     * When the query expression evaluates to true against the event, the event fields
-     * will be serialized into the Value and returned up the iterator stack. Implemenations
-     * will need to provide a key to be used with the event.
-     *
-     * @param k
-     * @return the key that should be returned with the map of values.
-     */
-    public abstract Key getReturnKey(Key k) throws Exception;
-
-    /**
-     * Implementations will need to fill the map with field visibilities, names, and
-     * values. When all fields have been aggregated the event will be evaluated against
-     * the query expression.
-     *
-     * @param event Multimap of event names and fields.
-     * @param key current Key
-     * @param value current Value
-     */
-    public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
-
-    /**
-     * Check to see if this key should be acted upon. Provides the ability to skip this key
-     * and all of the following ones that match using the comparator.
-     *
-     * @param key
-     * @return
-     */
-    public abstract boolean isKeyAccepted(Key key);
-
-    /**
-     * Reset state.
-     */
-    public void reset() {
-        event.clear();
+public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  
+  private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
+  protected static final byte[] NULL_BYTE = new byte[0];
+  public static final String QUERY_OPTION = "expr";
+  public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
+  
+  private PartialKey comparator = null;
+  private SortedKeyValueIterator<Key,Value> iterator;
+  private Key currentKey = new Key();
+  private Key returnKey;
+  private Value returnValue;
+  private String expression;
+  private QueryEvaluator evaluator;
+  private EventFields event = null;
+  private static Kryo kryo = new Kryo();
+  private Range seekRange = null;
+  private Set<String> skipExpressions = null;
+  
+  protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
+    iterator = other.iterator.deepCopy(env);
+    event = other.event;
+  }
+  
+  public AbstractEvaluatingIterator() {}
+  
+  /**
+   * Implementations will return the PartialKey value to use for comparing keys for aggregating events
+   * 
+   * @return the type of comparator to use
+   */
+  public abstract PartialKey getKeyComparator();
+  
+  /**
+   * When the query expression evaluates to true against the event, the event fields will be serialized into the Value and returned up the iterator stack.
+   * Implemenations will need to provide a key to be used with the event.
+   * 
+   * @param k
+   * @return the key that should be returned with the map of values.
+   */
+  public abstract Key getReturnKey(Key k) throws Exception;
+  
+  /**
+   * Implementations will need to fill the map with field visibilities, names, and values. When all fields have been aggregated the event will be evaluated
+   * against the query expression.
+   * 
+   * @param event
+   *          Multimap of event names and fields.
+   * @param key
+   *          current Key
+   * @param value
+   *          current Value
+   */
+  public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
+  
+  /**
+   * Check to see if this key should be acted upon. Provides the ability to skip this key and all of the following ones that match using the comparator.
+   * 
+   * @param key
+   * @return
+   */
+  public abstract boolean isKeyAccepted(Key key);
+  
+  /**
+   * Reset state.
+   */
+  public void reset() {
+    event.clear();
+  }
+  
+  private void aggregateRowColumn(EventFields event) throws IOException {
+    
+    currentKey.set(iterator.getTopKey());
+    
+    try {
+      fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+      iterator.next();
+      
+      while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
+        fillMap(event, iterator.getTopKey(), iterator.getTopValue());
+        iterator.next();
+      }
+      
+      // Get the return key
+      returnKey = getReturnKey(currentKey);
+    } catch (Exception e) {
+      throw new IOException("Error aggregating event", e);
     }
-
-    private void aggregateRowColumn(EventFields event) throws IOException {
-
-        currentKey.set(iterator.getTopKey());
-
-        try {
-            fillMap(event, iterator.getTopKey(), iterator.getTopValue());
-            iterator.next();
-
-            while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
-                fillMap(event, iterator.getTopKey(), iterator.getTopValue());
-                iterator.next();
-            }
-
-            //Get the return key
-            returnKey = getReturnKey(currentKey);
-        } catch (Exception e) {
-            throw new IOException("Error aggregating event", e);
+    
+  }
+  
+  private void findTop() throws IOException {
+    do {
+      reset();
+      // check if aggregation is needed
+      if (iterator.hasTop()) {
+        // Check to see if the current key is accepted. For example in the wiki
+        // table there are field index rows. We don't want to process those in
+        // some cases so return right away. Consume all of the non-accepted keys
+        while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
+          iterator.next();
         }
-
-    }
-
-    private void findTop() throws IOException {
-        do {
-            reset();
-            //check if aggregation is needed
-            if (iterator.hasTop()) {
-                //Check to see if the current key is accepted. For example in the wiki
-                //table there are field index rows. We don't want to process those in
-                //some cases so return right away. Consume all of the non-accepted keys
-                while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
-                    iterator.next();
-                }
-
-                if (iterator.hasTop()) {
-                    aggregateRowColumn(event);
-                    
-                    //Evaluate the event against the expression
-                    if (event.size() > 0 && this.evaluator.evaluate(event)) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Event evaluated to true, key = " + returnKey);
-                        }
-                        //Create a byte array
-                        byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
-                        //Wrap in ByteBuffer to work with Kryo
-                        ByteBuffer buf = ByteBuffer.wrap(serializedMap);
-                        //Serialize the EventFields object
-                        event.writeObjectData(kryo, buf);
-                        //Truncate array to the used size.
-                        returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
-                    } else {
-                        returnKey = null;
-                        returnValue = null;
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Iterator no longer has top.");
-                    }
-                }
-            } else {
-                log.info("Iterator.hasTop() == false");
+        
+        if (iterator.hasTop()) {
+          aggregateRowColumn(event);
+          
+          // Evaluate the event against the expression
+          if (event.size() > 0 && this.evaluator.evaluate(event)) {
+            if (log.isDebugEnabled()) {
+              log.debug("Event evaluated to true, key = " + returnKey);
             }
-        } while (returnValue == null && iterator.hasTop());
-
-        //Sanity check. Make sure both returnValue and returnKey are null or both are not null
-        if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
-            log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
-            log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
-            throw new IOException("Return values are inconsistent");
-        }
-    }
-
-    public Key getTopKey() {
-        if (returnKey != null) {
-            return returnKey;
-        }
-        return iterator.getTopKey();
-    }
-
-    public Value getTopValue() {
-        if (returnValue != null) {
-            return returnValue;
-        }
-        return iterator.getTopValue();
-    }
-
-    public boolean hasTop() {
-        return returnKey != null || iterator.hasTop();
-    }
-
-    public void next() throws IOException {
-        if (returnKey != null) {
+            // Create a byte array
+            byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
+            // Wrap in ByteBuffer to work with Kryo
+            ByteBuffer buf = ByteBuffer.wrap(serializedMap);
+            // Serialize the EventFields object
+            event.writeObjectData(kryo, buf);
+            // Truncate array to the used size.
+            returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
+          } else {
             returnKey = null;
             returnValue = null;
-        } else if (iterator.hasTop()) {
-            iterator.next();
-        }
-
-        findTop();
-    }
-
-    /**
-     * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
-     * @param range
-     * @return
-     */
-    static Range maximizeStartKeyTimeStamp(Range range) {
-        Range seekRange = range;
-
-        if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
-            Key seekKey = new Key(seekRange.getStartKey());
-            seekKey.setTimestamp(Long.MAX_VALUE);
-            seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
-        }
-
-        return seekRange;
-    }
-
-    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-        //do not want to seek to the middle of a value that should be
-        //aggregated...
-
-        seekRange = maximizeStartKeyTimeStamp(range);
-
-        iterator.seek(seekRange, columnFamilies, inclusive);
-        findTop();
-
-        if (range.getStartKey() != null) {
-            while (hasTop()
-                    && getTopKey().equals(range.getStartKey(), this.comparator)
-                    && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
-                //the value has a more recent time stamp, so
-                //pass it up
-                //log.debug("skipping "+getTopKey());
-                next();
-            }
-
-            while (hasTop() && range.beforeStartKey(getTopKey())) {
-                next();
-            }
+          }
+        } else {
+          if (log.isDebugEnabled()) {
+            log.debug("Iterator no longer has top.");
+          }
         }
-
+      } else {
+        log.info("Iterator.hasTop() == false");
+      }
+    } while (returnValue == null && iterator.hasTop());
+    
+    // Sanity check. Make sure both returnValue and returnKey are null or both are not null
+    if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
+      log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
+      log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
+      throw new IOException("Return values are inconsistent");
+    }
+  }
+  
+  public Key getTopKey() {
+    if (returnKey != null) {
+      return returnKey;
+    }
+    return iterator.getTopKey();
+  }
+  
+  public Value getTopValue() {
+    if (returnValue != null) {
+      return returnValue;
+    }
+    return iterator.getTopValue();
+  }
+  
+  public boolean hasTop() {
+    return returnKey != null || iterator.hasTop();
+  }
+  
+  public void next() throws IOException {
+    if (returnKey != null) {
+      returnKey = null;
+      returnValue = null;
+    } else if (iterator.hasTop()) {
+      iterator.next();
     }
-
-    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
-        validateOptions(options);
-        event = new EventFields();
-        this.comparator = getKeyComparator();
-        this.iterator = source;
-        try {
-        	//Replace any expressions that we should not evaluate.
-        	if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
-        		for (String skip : this.skipExpressions) {
-        			//Expression should have form: field<sp>operator<sp>literal.
-        			//We are going to replace the expression with field == null.
-        			String field = skip.substring(0, skip.indexOf(" ") -1);
-        			this.expression = this.expression.replaceAll(skip, field+" == null");
-        		}
-        	}
-            this.evaluator = new QueryEvaluator(this.expression);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Failed to parse query", e);
-        }
-        EventFields.initializeKryo(kryo);
+    
+    findTop();
+  }
+  
+  /**
+   * Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
+   * 
+   * @param range
+   * @return
+   */
+  static Range maximizeStartKeyTimeStamp(Range range) {
+    Range seekRange = range;
+    
+    if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
+      Key seekKey = new Key(seekRange.getStartKey());
+      seekKey.setTimestamp(Long.MAX_VALUE);
+      seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
     }
-
-    public IteratorOptions describeOptions() {
-    	Map<String,String> options = new HashMap<String,String>();
-    	options.put(QUERY_OPTION, "query expression");
-    	options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
-        return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+    
+    return seekRange;
+  }
+  
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    // do not want to seek to the middle of a value that should be
+    // aggregated...
+    
+    seekRange = maximizeStartKeyTimeStamp(range);
+    
+    iterator.seek(seekRange, columnFamilies, inclusive);
+    findTop();
+    
+    if (range.getStartKey() != null) {
+      while (hasTop() && getTopKey().equals(range.getStartKey(), this.comparator) && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
+        // the value has a more recent time stamp, so
+        // pass it up
+        // log.debug("skipping "+getTopKey());
+        next();
+      }
+      
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
     }
-
-    public boolean validateOptions(Map<String, String> options) {
-        if (!options.containsKey(QUERY_OPTION))
-            return false;
-        else 
-        	this.expression = options.get(QUERY_OPTION);
-        
-        if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
-        	String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
-        	if (expressionList != null && !expressionList.trim().equals("")) {
-	        	this.skipExpressions = new HashSet<String>();
-	        	for (String e : expressionList.split(","))
-	        		this.skipExpressions.add(e);
-        	}
+    
+  }
+  
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    validateOptions(options);
+    event = new EventFields();
+    this.comparator = getKeyComparator();
+    this.iterator = source;
+    try {
+      // Replace any expressions that we should not evaluate.
+      if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
+        for (String skip : this.skipExpressions) {
+          // Expression should have form: field<sp>operator<sp>literal.
+          // We are going to replace the expression with field == null.
+          String field = skip.substring(0, skip.indexOf(" ") - 1);
+          this.expression = this.expression.replaceAll(skip, field + " == null");
         }
-        return true;
-    }
-
-    public String getQueryExpression() {
-        return this.expression;
-    }
+      }
+      this.evaluator = new QueryEvaluator(this.expression);
+    } catch (ParseException e) {
+      throw new IllegalArgumentException("Failed to parse query", e);
+    }
+    EventFields.initializeKryo(kryo);
+  }
+  
+  public IteratorOptions describeOptions() {
+    Map<String,String> options = new HashMap<String,String>();
+    options.put(QUERY_OPTION, "query expression");
+    options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
+    return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
+  }
+  
+  public boolean validateOptions(Map<String,String> options) {
+    if (!options.containsKey(QUERY_OPTION))
+      return false;
+    else
+      this.expression = options.get(QUERY_OPTION);
+    
+    if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
+      String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
+      if (expressionList != null && !expressionList.trim().equals("")) {
+        this.skipExpressions = new HashSet<String>();
+        for (String e : expressionList.split(","))
+          this.skipExpressions.add(e);
+      }
+    }
+    return true;
+  }
+  
+  public String getQueryExpression() {
+    return this.expression;
+  }
 }

Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java Mon Dec  5 20:05:49 2011
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package iterator;
 
 import java.io.IOException;
@@ -32,896 +32,909 @@ import org.apache.commons.codec.binary.B
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
-public class AndIterator implements SortedKeyValueIterator<Key, Value> {
-
-    protected static final Logger log = Logger.getLogger(AndIterator.class);
-    private TermSource[] sources;
-    private int sourcesCount = 0;
-    protected Text nullText = new Text();
-    protected final byte[] emptyByteArray = new byte[0];
-    private Key topKey = null;
-    protected Value value = new Value(emptyByteArray);
-    private Range overallRange;
-    private Text currentRow = null;
-    private Text currentTerm = new Text(emptyByteArray);
-    private Text currentDocID = new Text(emptyByteArray);
-    private Collection<ByteSequence> seekColumnFamilies;
-    private boolean inclusive;
-    private Text parentEndRow;
-
-    /**
-     * Used in representing a Term that is intersected on.
-     */
-    protected static class TermSource {
-
-        public SortedKeyValueIterator<Key, Value> iter;
-        public Text dataLocation;
-        public Text term;
-        public boolean notFlag;
-
-        public TermSource(TermSource other) {
-            this.iter = other.iter;
-            this.dataLocation = other.dataLocation;
-            this.term = other.term;
-            this.notFlag = other.notFlag;
-        }
-
-        public TermSource(SortedKeyValueIterator<Key, Value> iter, Text dataLocation, Text term) {
-            this.iter = iter;
-            this.dataLocation = dataLocation;
-            this.term = term;
-            this.notFlag = false;
-        }
-
-        public TermSource(SortedKeyValueIterator<Key, Value> iter, Text dataLocation, Text term, boolean notFlag) {
-            this.iter = iter;
-            this.dataLocation = dataLocation;
-            this.term = term;
-            this.notFlag = notFlag;
-        }
-
-        public String getTermString() {
-            return (this.term == null) ? new String("Iterator") : this.term.toString();
-        }
-    }
-
-
-    /* |   Row	  |   Column Family  |     Column Qualifier      |  Value
-     * | {RowID}  |  {dataLocation}  | {term}\0{dataType}\0{UID} |  Empty
-     */
-    protected Text getPartition(Key key) {
-        return key.getRow();
-    }
-
-    /**
-     * Returns the given key's dataLocation
-     * @param key
-     * @return
-     */
-    protected Text getDataLocation(Key key) {
-        return key.getColumnFamily();
-    }
-
-    /**
-     * Returns the given key's term
-     * @param key
-     * @return
-     */
-    protected Text getTerm(Key key) {
-        int idx = 0;
-        String sKey = key.getColumnQualifier().toString();
-
-        idx = sKey.indexOf("\0");
-        return new Text(sKey.substring(0, idx));
-    }
-
-    /**
-     * Returns the given key's DocID
-     * @param key
-     * @return
-     */
-    protected Text getDocID(Key key) {
-        int idx = 0;
-        String sKey = key.getColumnQualifier().toString();
-
-        idx = sKey.indexOf("\0");
-        return new Text(sKey.substring(idx + 1));
-    }
-
-    /**
-     * Returns the given key's UID
-     * @param key
-     * @return
-     */
-    protected String getUID(Key key) {
-        int idx = 0;
-        String sKey = key.getColumnQualifier().toString();
-
-        idx = sKey.lastIndexOf("\0");
-        return sKey.substring(idx + 1);
-    }
-
-    /**
-     * Build a key from the given row and dataLocation
-     * @param row The desired row
-     * @param dataLocation The desired dataLocation
-     * @return
-     */
-    protected Key buildKey(Text row, Text dataLocation) {
-        return new Key(row, (dataLocation == null) ? nullText : dataLocation);
-    }
-
-    /**
-     * Build a key from the given row, dataLocation, and term
-     * @param row The desired row
-     * @param dataLocation The desired dataLocation
-     * @param term The desired term
-     * @return
-     */
-    protected Key buildKey(Text row, Text dataLocation, Text term) {
-        return new Key(row,
-                (dataLocation == null) ? nullText : dataLocation,
-                (term == null) ? nullText : term);
-    }
-
-    /**
-     * Return the key that directly follows the given key
-     * @param key The key who will be directly before the returned key
-     * @return
-     */
-    protected Key buildFollowingPartitionKey(Key key) {
-        return key.followingKey(PartialKey.ROW);
-    }
-
-    /**
-     * Empty default constructor
-     */
-    public AndIterator() {
-    }
-
-    public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
-        return new AndIterator(this, env);
-    }
-
-    public AndIterator(AndIterator other, IteratorEnvironment env) {
-        if (other.sources != null) {
-            sourcesCount = other.sourcesCount;
-            sources = new TermSource[sourcesCount];
-            for (int i = 0; i < sourcesCount; i++) {
-                sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].dataLocation, other.sources[i].term);
-            }
-        }
-    }
-
-    public Key getTopKey() {
-        return topKey;
-    }
-
-    public Value getTopValue() {
-        return value;
-    }
-
-    public boolean hasTop() {
-        return currentRow != null;
-    }
-
-    /**
-     * Find the next key in the current TermSource that is at or beyond
-     * the cursor (currentRow, currentTerm, currentDocID).
-     * @param sourceID The index of the current source in <code>sources</code>
-     * @return True if the source advanced beyond the cursor
-     * @throws IOException
-     */
-    private boolean seekOneSource(TermSource ts) throws IOException {
-        /* Within this loop progress must be made in one of the following forms:
-         *    - currentRow, currentTerm, or curretDocID must be increased
-         *    - the given source must advance its iterator
-         *  This loop will end when any of the following criteria are met
-         *    - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentTerm, currentDocID)
-         *    - the given source is out of data and currentRow is set to null
-         *    - the given source has advanced beyond the endRow and currentRow is set to null
-         */
-
-        // precondition: currentRow is not null
-        boolean advancedCursor = false;
-
-        while (true) {
-            if (ts.iter.hasTop() == false) {
-                if (log.isDebugEnabled()) {
-                    log.debug("The current iterator no longer has a top");
-                }
-
-                // If we got to the end of an iterator, found a Match if it's a NOT
-                if (ts.notFlag) {
-                    break;
-                }
-
-                currentRow = null;
-                // setting currentRow to null counts as advancing the cursor
-                return true;
-            }
-
-            // check if we're past the end key
-            int endCompare = -1;
-
-            if (log.isDebugEnabled()) {
-                log.debug("Current topKey = " + ts.iter.getTopKey());
-            }
-
-            // we should compare the row to the end of the range
-            if (overallRange.getEndKey() != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("II.seekOneSource overallRange.getEndKey() != null");
-                }
-
-                endCompare = overallRange.getEndKey().getRow().compareTo(ts.iter.getTopKey().getRow());
-
-                if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("II.seekOneSource at the end of the tablet server");
-                    }
-
-                    currentRow = null;
-
-                    // setting currentRow to null counts as advancing the cursor
-                    return true;
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("II.seekOneSource overallRange.getEndKey() == null");
-                }
-            }
-
-            // Compare the Row IDs
-            int partitionCompare = currentRow.compareTo(getPartition(ts.iter.getTopKey()));
-            if (log.isDebugEnabled()) {
-                log.debug("Current partition: " + currentRow);
-            }
-
-            // check if this source is already at or beyond currentRow
-            // if not, then seek to at least the current row
-            if (partitionCompare > 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Need to seek to the current row");
-
-                    // seek to at least the currentRow
-                    log.debug("ts.dataLocation = " + ts.dataLocation.getBytes());
-                    log.debug("Term = " + new Text(ts.term + "\0" + currentDocID).getBytes());
-                }
-
-                Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);//new Text(ts.term + "\0" + currentDocID));
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Seeking to: " + seekKey);
-                }
-
-                ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-                continue;
-            }
-
-            // check if this source has gone beyond currentRow
-            // if so, advance currentRow
-            if (partitionCompare < 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Went too far beyond the currentRow");
-                }
-
-                if (ts.notFlag) {
-                    break;
-                }
-
-                currentRow.set(getPartition(ts.iter.getTopKey()));
-                currentDocID.set(emptyByteArray);
-
-                advancedCursor = true;
-                continue;
-            }
-
-            // we have verified that the current source is positioned in currentRow
-            // now we must make sure we're in the right columnFamily in the current row
-            if (ts.dataLocation != null) {
-                int dataLocationCompare = ts.dataLocation.compareTo(getDataLocation(ts.iter.getTopKey()));
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Comparing dataLocations");
-                    log.debug("dataLocation = " + ts.dataLocation);
-                    log.debug("newDataLocation = " + getDataLocation(ts.iter.getTopKey()));
-                }
-
-                // check if this source is already on the right columnFamily
-                // if not, then seek forwards to the right columnFamily
-                if (dataLocationCompare > 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Need to seek to the right dataLocation");
-                    }
-
-                    Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);//, new Text(ts.term + "\0" + currentDocID));
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Seeking to: " + seekKey);
-                    }
-
-                    ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-                    if(!ts.iter.hasTop()){
-                        currentRow = null;
-                        return true;
-                    }
-
-                    continue;
-                }
-                // check if this source is beyond the right columnFamily
-                // if so, then seek to the next row
-                if (dataLocationCompare < 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Went too far beyond the dataLocation");
-                    }
-
-                    if (endCompare == 0) {
-                        // we're done
-                        currentRow = null;
-
-                        // setting currentRow to null counts as advancing the cursor
-                        return true;
-                    }
-
-                    // Seeking beyond the current dataLocation gives a valid negated result
-                    if (ts.notFlag) {
-                        break;
-                    }
-
-                    Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Seeking to: " + seekKey);
-                    }
-
-                    ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-                    if(!ts.iter.hasTop()){
-                        currentRow = null;
-                        return true;
-                    }
-                    continue;
-                }
-            }
-
-            // Compare the Terms
-            int termCompare = ts.term.compareTo(getTerm(ts.iter.getTopKey()));
-            if (log.isDebugEnabled()) {
-                log.debug("term = " + ts.term);
-                log.debug("newTerm = " + getTerm(ts.iter.getTopKey()));
-            }
-
-            // We need to seek down farther into the data
-            if (termCompare > 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Need to seek to the right term");
-                }
-
-                Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0"));//new Text(ts.term + "\0" + currentDocID));
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Seeking to: " + seekKey);
-                }
-
-                ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-                if (!ts.iter.hasTop()) {
-                    currentRow = null;
-                    return true;
-                }
-
-                //currentTerm = getTerm(ts.iter.getTopKey());
-
-                if (log.isDebugEnabled()) {
-                    log.debug("topKey after seeking to correct term: " + ts.iter.getTopKey());
-                }
-
-                continue;
-            }
-
-            // We've jumped out of the current term, set the new term as currentTerm and start looking again
-            if (termCompare < 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("TERM: Need to jump to the next row");
-                }
-
-                if (endCompare == 0) {
-                    currentRow = null;
-
-                    return true;
-                }
-
-                if (ts.notFlag) {
-                    break;
-                }
-
-                Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
-                if (log.isDebugEnabled()) {
-                    log.debug("Using this key to find the next key: " + ts.iter.getTopKey());
-                    log.debug("Seeking to: " + seekKey);
-                }
-
-                ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-
-                if (!ts.iter.hasTop()) {
-                    currentRow = null;
-                    return true;
-                }
-
-                currentTerm = getTerm(ts.iter.getTopKey());
-
-                continue;
-            }
-
-            // Compare the DocIDs
-            Text docid = getDocID(ts.iter.getTopKey());
-            int docidCompare = currentDocID.compareTo(docid);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Comparing DocIDs");
-                log.debug("currentDocID = " + currentDocID);
-                log.debug("docid = " + docid);
-            }
-
-            // The source isn't at the right DOC
-            if (docidCompare > 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Need to seek to the correct docid");
-                }
-
-                // seek forwards
-                Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0" + currentDocID));
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Seeking to: " + seekKey);
-                }
-
-                ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
-
-                continue;
-            }
-
-            // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
-            if (docidCompare < 0) {
-                if (ts.notFlag) {
-                    break;
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("We went too far, update the currentDocID to be the location of where were seek'ed to");
-                }
-
-                currentDocID.set(docid);
-                advancedCursor = true;
-                break;
-            }
-
-            // Set the term as currentTerm (in case we found this record on the first try)
-            currentTerm = getTerm(ts.iter.getTopKey());
-
-            if (log.isDebugEnabled()) {
-                log.debug("currentTerm = " + currentTerm);
-            }
-
-            // If we're negated, next() the first TermSource since we guaranteed it was not a NOT term
-            if (ts.notFlag) {
-                sources[0].iter.next();
-                advancedCursor = true;
-            }
-
-            // If we got here, we have a match
-            break;
-        }
-
-        return advancedCursor;
-    }
-
-    public void next() throws IOException {
+public class AndIterator implements SortedKeyValueIterator<Key,Value> {
+  
+  protected static final Logger log = Logger.getLogger(AndIterator.class);
+  private TermSource[] sources;
+  private int sourcesCount = 0;
+  protected Text nullText = new Text();
+  protected final byte[] emptyByteArray = new byte[0];
+  private Key topKey = null;
+  protected Value value = new Value(emptyByteArray);
+  private Range overallRange;
+  private Text currentRow = null;
+  private Text currentTerm = new Text(emptyByteArray);
+  private Text currentDocID = new Text(emptyByteArray);
+  private Collection<ByteSequence> seekColumnFamilies;
+  private boolean inclusive;
+  private Text parentEndRow;
+  
+  /**
+   * Used in representing a Term that is intersected on.
+   */
+  protected static class TermSource {
+    
+    public SortedKeyValueIterator<Key,Value> iter;
+    public Text dataLocation;
+    public Text term;
+    public boolean notFlag;
+    
+    public TermSource(TermSource other) {
+      this.iter = other.iter;
+      this.dataLocation = other.dataLocation;
+      this.term = other.term;
+      this.notFlag = other.notFlag;
+    }
+    
+    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term) {
+      this.iter = iter;
+      this.dataLocation = dataLocation;
+      this.term = term;
+      this.notFlag = false;
+    }
+    
+    public TermSource(SortedKeyValueIterator<Key,Value> iter, Text dataLocation, Text term, boolean notFlag) {
+      this.iter = iter;
+      this.dataLocation = dataLocation;
+      this.term = term;
+      this.notFlag = notFlag;
+    }
+    
+    public String getTermString() {
+      return (this.term == null) ? new String("Iterator") : this.term.toString();
+    }
+  }
+  
+  /*
+   * | Row | Column Family | Column Qualifier | Value | {RowID} | {dataLocation} | {term}\0{dataType}\0{UID} | Empty
+   */
+  protected Text getPartition(Key key) {
+    return key.getRow();
+  }
+  
+  /**
+   * Returns the given key's dataLocation
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getDataLocation(Key key) {
+    return key.getColumnFamily();
+  }
+  
+  /**
+   * Returns the given key's term
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getTerm(Key key) {
+    int idx = 0;
+    String sKey = key.getColumnQualifier().toString();
+    
+    idx = sKey.indexOf("\0");
+    return new Text(sKey.substring(0, idx));
+  }
+  
+  /**
+   * Returns the given key's DocID
+   * 
+   * @param key
+   * @return
+   */
+  protected Text getDocID(Key key) {
+    int idx = 0;
+    String sKey = key.getColumnQualifier().toString();
+    
+    idx = sKey.indexOf("\0");
+    return new Text(sKey.substring(idx + 1));
+  }
+  
+  /**
+   * Returns the given key's UID
+   * 
+   * @param key
+   * @return
+   */
+  protected String getUID(Key key) {
+    int idx = 0;
+    String sKey = key.getColumnQualifier().toString();
+    
+    idx = sKey.lastIndexOf("\0");
+    return sKey.substring(idx + 1);
+  }
+  
+  /**
+   * Build a key from the given row and dataLocation
+   * 
+   * @param row
+   *          The desired row
+   * @param dataLocation
+   *          The desired dataLocation
+   * @return
+   */
+  protected Key buildKey(Text row, Text dataLocation) {
+    return new Key(row, (dataLocation == null) ? nullText : dataLocation);
+  }
+  
+  /**
+   * Build a key from the given row, dataLocation, and term
+   * 
+   * @param row
+   *          The desired row
+   * @param dataLocation
+   *          The desired dataLocation
+   * @param term
+   *          The desired term
+   * @return
+   */
+  protected Key buildKey(Text row, Text dataLocation, Text term) {
+    return new Key(row, (dataLocation == null) ? nullText : dataLocation, (term == null) ? nullText : term);
+  }
+  
+  /**
+   * Return the key that directly follows the given key
+   * 
+   * @param key
+   *          The key who will be directly before the returned key
+   * @return
+   */
+  protected Key buildFollowingPartitionKey(Key key) {
+    return key.followingKey(PartialKey.ROW);
+  }
+  
+  /**
+   * Empty default constructor
+   */
+  public AndIterator() {}
+  
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    return new AndIterator(this, env);
+  }
+  
+  public AndIterator(AndIterator other, IteratorEnvironment env) {
+    if (other.sources != null) {
+      sourcesCount = other.sourcesCount;
+      sources = new TermSource[sourcesCount];
+      for (int i = 0; i < sourcesCount; i++) {
+        sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].dataLocation, other.sources[i].term);
+      }
+    }
+  }
+  
+  public Key getTopKey() {
+    return topKey;
+  }
+  
+  public Value getTopValue() {
+    return value;
+  }
+  
+  public boolean hasTop() {
+    return currentRow != null;
+  }
+  
+  /**
+   * Find the next key in the current TermSource that is at or beyond the cursor (currentRow, currentTerm, currentDocID).
+   * 
+   * @param sourceID
+   *          The index of the current source in <code>sources</code>
+   * @return True if the source advanced beyond the cursor
+   * @throws IOException
+   */
+  private boolean seekOneSource(TermSource ts) throws IOException {
+    /*
+     * Within this loop progress must be made in one of the following forms: - currentRow, currentTerm, or curretDocID must be increased - the given source must
+     * advance its iterator This loop will end when any of the following criteria are met - the iterator for the given source is pointing to the key
+     * (currentRow, columnFamilies[sourceID], currentTerm, currentDocID) - the given source is out of data and currentRow is set to null - the given source has
+     * advanced beyond the endRow and currentRow is set to null
+     */
+    
+    // precondition: currentRow is not null
+    boolean advancedCursor = false;
+    
+    while (true) {
+      if (ts.iter.hasTop() == false) {
         if (log.isDebugEnabled()) {
-            log.debug("In ModifiedIntersectingIterator.next()");
-        }
-
-        if (currentRow == null) {
-            return;
-        }
-
-        // precondition: the current row is set up and the sources all have the same column qualifier
-        // while we don't have a match, seek in the source with the smallest column qualifier
-        sources[0].iter.next();
-
-        advanceToIntersection();
-
-        if (hasTop()) {
-            if (overallRange != null && !overallRange.contains(topKey)) {
-                topKey = null;
-            }
+          log.debug("The current iterator no longer has a top");
         }
-    }
-
-    protected void advanceToIntersection() throws IOException {
+        
+        // If we got to the end of an iterator, found a Match if it's a NOT
+        if (ts.notFlag) {
+          break;
+        }
+        
+        currentRow = null;
+        // setting currentRow to null counts as advancing the cursor
+        return true;
+      }
+      
+      // check if we're past the end key
+      int endCompare = -1;
+      
+      if (log.isDebugEnabled()) {
+        log.debug("Current topKey = " + ts.iter.getTopKey());
+      }
+      
+      // we should compare the row to the end of the range
+      if (overallRange.getEndKey() != null) {
         if (log.isDebugEnabled()) {
-            log.debug("In AndIterator.advanceToIntersection()");
+          log.debug("II.seekOneSource overallRange.getEndKey() != null");
         }
-
-        boolean cursorChanged = true;
-        int numSeeks = 0;
-        while (cursorChanged) {
-            // seek all of the sources to at least the highest seen column qualifier in the current row
-            cursorChanged = false;
-            for (TermSource ts : sources) {
-                if (currentRow == null) {
-                    topKey = null;
-                    return;
-                }
-                numSeeks++;
-                if (seekOneSource(ts)) {
-                    cursorChanged = true;
-                    break;
-                }
-            }
+        
+        endCompare = overallRange.getEndKey().getRow().compareTo(ts.iter.getTopKey().getRow());
+        
+        if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
+          if (log.isDebugEnabled()) {
+            log.debug("II.seekOneSource at the end of the tablet server");
+          }
+          
+          currentRow = null;
+          
+          // setting currentRow to null counts as advancing the cursor
+          return true;
         }
-
-        topKey = buildKey(currentRow, currentTerm, currentDocID);
-
+      } else {
         if (log.isDebugEnabled()) {
-            log.debug("ModifiedIntersectingIterator: Got a match: " + topKey);
-        }
-    }
-
-    public static String stringTopKey(SortedKeyValueIterator<Key, Value> iter) {
-        if (iter.hasTop()) {
-            return iter.getTopKey().toString();
-        }
-        return "";
-    }
-    public static final String columnFamiliesOptionName = "columnFamilies";
-    public static final String termValuesOptionName = "termValues";
-    public static final String notFlagsOptionName = "notFlags";
-
-    /**
-     * Encode a <code>Text</code> array of all the columns to intersect on
-     * @param columns The columns to be encoded
-     * @return
-     */
-    public static String encodeColumns(Text[] columns) {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < columns.length; i++) {
-            sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
-            sb.append('\n');
-        }
-        return sb.toString();
-    }
-
-    /**
-     * Encode a <code>Text</code> array of all of the terms to intersect on. The terms should match the columns
-     * in a one-to-one manner
-     * @param terms The terms to be encoded
-     * @return
-     */
-    public static String encodeTermValues(Text[] terms) {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < terms.length; i++) {
-            sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(terms[i]))));
-            sb.append('\n');
+          log.debug("II.seekOneSource overallRange.getEndKey() == null");
         }
-
-        return sb.toString();
-    }
-
-    /**
-     * Encode an array of <code>booleans</code> denoted which columns are NOT'ed
-     * @param flags The array of NOTs
-     * @return
-     */
-    public static String encodeBooleans(boolean[] flags) {
-        byte[] bytes = new byte[flags.length];
-        for (int i = 0; i < flags.length; i++) {
-            if (flags[i]) {
-                bytes[i] = 1;
-            } else {
-                bytes[i] = 0;
-            }
+      }
+      
+      // Compare the Row IDs
+      int partitionCompare = currentRow.compareTo(getPartition(ts.iter.getTopKey()));
+      if (log.isDebugEnabled()) {
+        log.debug("Current partition: " + currentRow);
+      }
+      
+      // check if this source is already at or beyond currentRow
+      // if not, then seek to at least the current row
+      if (partitionCompare > 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("Need to seek to the current row");
+          
+          // seek to at least the currentRow
+          log.debug("ts.dataLocation = " + ts.dataLocation.getBytes());
+          log.debug("Term = " + new Text(ts.term + "\0" + currentDocID).getBytes());
+        }
+        
+        Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);// new Text(ts.term + "\0" + currentDocID));
+        
+        if (log.isDebugEnabled()) {
+          log.debug("Seeking to: " + seekKey);
         }
-        return new String(Base64.encodeBase64(bytes));
-    }
-
-    /**
-     * Decode the encoded columns into a <code>Text</code> array
-     * @param columns The Base64 encoded String of the columns
-     * @return
-     */
-    public static Text[] decodeColumns(String columns) {
-        String[] columnStrings = columns.split("\n");
-        Text[] columnTexts = new Text[columnStrings.length];
-        for (int i = 0; i < columnStrings.length; i++) {
-            columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
+        
+        ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+        continue;
+      }
+      
+      // check if this source has gone beyond currentRow
+      // if so, advance currentRow
+      if (partitionCompare < 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("Went too far beyond the currentRow");
         }
-
-        return columnTexts;
-    }
-
-    /**
-     * Decode the encoded terms into a <code>Text</code> array
-     * @param terms The Base64 encoded String of the terms
-     * @return
-     */
-    public static Text[] decodeTermValues(String terms) {
-        String[] termStrings = terms.split("\n");
-        Text[] termTexts = new Text[termStrings.length];
-        for (int i = 0; i < termStrings.length; i++) {
-            termTexts[i] = new Text(Base64.decodeBase64(termStrings[i].getBytes()));
+        
+        if (ts.notFlag) {
+          break;
         }
-
-        return termTexts;
-    }
-
-    /**
-     * Decode the encoded NOT flags into a <code>boolean</code> array
-     * @param flags
-     * @return
-     */
-    public static boolean[] decodeBooleans(String flags) {
-        // return null of there were no flags
-        if (flags == null) {
-            return null;
+        
+        currentRow.set(getPartition(ts.iter.getTopKey()));
+        currentDocID.set(emptyByteArray);
+        
+        advancedCursor = true;
+        continue;
+      }
+      
+      // we have verified that the current source is positioned in currentRow
+      // now we must make sure we're in the right columnFamily in the current row
+      if (ts.dataLocation != null) {
+        int dataLocationCompare = ts.dataLocation.compareTo(getDataLocation(ts.iter.getTopKey()));
+        
+        if (log.isDebugEnabled()) {
+          log.debug("Comparing dataLocations");
+          log.debug("dataLocation = " + ts.dataLocation);
+          log.debug("newDataLocation = " + getDataLocation(ts.iter.getTopKey()));
+        }
+        
+        // check if this source is already on the right columnFamily
+        // if not, then seek forwards to the right columnFamily
+        if (dataLocationCompare > 0) {
+          if (log.isDebugEnabled()) {
+            log.debug("Need to seek to the right dataLocation");
+          }
+          
+          Key seekKey = buildKey(currentRow, ts.dataLocation, nullText);// , new Text(ts.term + "\0" + currentDocID));
+          
+          if (log.isDebugEnabled()) {
+            log.debug("Seeking to: " + seekKey);
+          }
+          
+          ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+          if (!ts.iter.hasTop()) {
+            currentRow = null;
+            return true;
+          }
+          
+          continue;
+        }
+        // check if this source is beyond the right columnFamily
+        // if so, then seek to the next row
+        if (dataLocationCompare < 0) {
+          if (log.isDebugEnabled()) {
+            log.debug("Went too far beyond the dataLocation");
+          }
+          
+          if (endCompare == 0) {
+            // we're done
+            currentRow = null;
+            
+            // setting currentRow to null counts as advancing the cursor
+            return true;
+          }
+          
+          // Seeking beyond the current dataLocation gives a valid negated result
+          if (ts.notFlag) {
+            break;
+          }
+          
+          Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
+          
+          if (log.isDebugEnabled()) {
+            log.debug("Seeking to: " + seekKey);
+          }
+          
+          ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+          if (!ts.iter.hasTop()) {
+            currentRow = null;
+            return true;
+          }
+          continue;
+        }
+      }
+      
+      // Compare the Terms
+      int termCompare = ts.term.compareTo(getTerm(ts.iter.getTopKey()));
+      if (log.isDebugEnabled()) {
+        log.debug("term = " + ts.term);
+        log.debug("newTerm = " + getTerm(ts.iter.getTopKey()));
+      }
+      
+      // We need to seek down farther into the data
+      if (termCompare > 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("Need to seek to the right term");
         }
-
-        byte[] bytes = Base64.decodeBase64(flags.getBytes());
-        boolean[] bFlags = new boolean[bytes.length];
-        for (int i = 0; i < bytes.length; i++) {
-            if (bytes[i] == 1) {
-                bFlags[i] = true;
-            } else {
-                bFlags[i] = false;
-            }
+        
+        Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0"));// new Text(ts.term + "\0" + currentDocID));
+        
+        if (log.isDebugEnabled()) {
+          log.debug("Seeking to: " + seekKey);
         }
-
-        return bFlags;
-    }
-
-    public void init(SortedKeyValueIterator<Key, Value> source,
-            Map<String, String> options, IteratorEnvironment env) throws IOException {
+        
+        ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+        if (!ts.iter.hasTop()) {
+          currentRow = null;
+          return true;
+        }
+        
+        // currentTerm = getTerm(ts.iter.getTopKey());
+        
         if (log.isDebugEnabled()) {
-            log.debug("In AndIterator.init()");
+          log.debug("topKey after seeking to correct term: " + ts.iter.getTopKey());
         }
-
-        Text[] dataLocations = decodeColumns(options.get(columnFamiliesOptionName));
-        Text[] terms = decodeTermValues(options.get(termValuesOptionName));
-        boolean[] notFlags = decodeBooleans(options.get(notFlagsOptionName));
-
-        if (terms.length < 2) {
-            throw new IllegalArgumentException("AndIterator requires two or more columns families");
+        
+        continue;
+      }
+      
+      // We've jumped out of the current term, set the new term as currentTerm and start looking again
+      if (termCompare < 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("TERM: Need to jump to the next row");
         }
-
-        // Scan the not flags.
-        // There must be at least one term that isn't negated
-        // And we are going to re-order such that the first term is not a ! term
-        if (notFlags == null) {
-            notFlags = new boolean[terms.length];
-            for (int i = 0; i < terms.length; i++) {
-                notFlags[i] = false;
-            }
+        
+        if (endCompare == 0) {
+          currentRow = null;
+          
+          return true;
+        }
+        
+        if (ts.notFlag) {
+          break;
         }
-
-        // Make sure that the first dataLocation/Term is not a NOT by swapping it with a later dataLocation/Term
-        if (notFlags[0]) {
-            for (int i = 1; i < notFlags.length; i++) {
-                if (notFlags[i] == false) {
-                    // Swap the terms
-                    Text swap = new Text(terms[0]);
-                    terms[0].set(terms[i]);
-                    terms[i].set(swap);
-
-                    // Swap the dataLocations
-                    swap.set(dataLocations[0]);
-                    dataLocations[0].set(dataLocations[i]);
-                    dataLocations[i].set(swap);
-
-                    // Flip the notFlags
-                    notFlags[0] = false;
-                    notFlags[i] = true;
-                    break;
-                }
-            }
-
-            if (notFlags[0]) {
-                throw new IllegalArgumentException("AndIterator requires at least one column family without not");
-            }
+        
+        Key seekKey = buildFollowingPartitionKey(ts.iter.getTopKey());
+        if (log.isDebugEnabled()) {
+          log.debug("Using this key to find the next key: " + ts.iter.getTopKey());
+          log.debug("Seeking to: " + seekKey);
         }
-
-        // Build up the array of sources that are to be intersected
-        sources = new TermSource[dataLocations.length];
-        sources[0] = new TermSource(source, dataLocations[0], terms[0]);
-        for (int i = 1; i < dataLocations.length; i++) {
-            sources[i] = new TermSource(source.deepCopy(env), dataLocations[i], terms[i], notFlags[i]);
+        
+        ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+        
+        if (!ts.iter.hasTop()) {
+          currentRow = null;
+          return true;
+        }
+        
+        currentTerm = getTerm(ts.iter.getTopKey());
+        
+        continue;
+      }
+      
+      // Compare the DocIDs
+      Text docid = getDocID(ts.iter.getTopKey());
+      int docidCompare = currentDocID.compareTo(docid);
+      
+      if (log.isDebugEnabled()) {
+        log.debug("Comparing DocIDs");
+        log.debug("currentDocID = " + currentDocID);
+        log.debug("docid = " + docid);
+      }
+      
+      // The source isn't at the right DOC
+      if (docidCompare > 0) {
+        if (log.isDebugEnabled()) {
+          log.debug("Need to seek to the correct docid");
         }
-
-        sourcesCount = dataLocations.length;
-    }
-
-    public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+        
+        // seek forwards
+        Key seekKey = buildKey(currentRow, ts.dataLocation, new Text(ts.term + "\0" + currentDocID));
+        
         if (log.isDebugEnabled()) {
-            log.debug("In AndIterator.seek()");
-            log.debug("AndIterator.seek Given range => " + range);
+          log.debug("Seeking to: " + seekKey);
         }
-        //if (firstSeek) {
-        overallRange = new Range(range);
-        //firstSeek = false;
-        //}
-        if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
-            this.parentEndRow = range.getEndKey().getRow();
+        
+        ts.iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
+        
+        continue;
+      }
+      
+      // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
+      if (docidCompare < 0) {
+        if (ts.notFlag) {
+          break;
         }
-
-        //overallRange = new Range(range);
-        currentRow = new Text();
-        currentDocID.set(emptyByteArray);
-
-        this.seekColumnFamilies = seekColumnFamilies;
-        this.inclusive = inclusive;
-
-        // seek each of the sources to the right column family within the row given by key
-        for (int i = 0; i < sourcesCount; i++) {
-            Key sourceKey;
-            if (range.getStartKey() != null) {
-                // Build a key with the DocID if one is given
-                if (range.getStartKey().getColumnQualifier() != null) {
-                    sourceKey = buildKey(getPartition(range.getStartKey()),
-                            (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
-                            (sources[i].term == null) ? nullText : new Text(sources[i].term + "\0" + range.getStartKey().getColumnQualifier()));
-                } // Build a key with just the term.
-                else {
-                    sourceKey = buildKey(getPartition(range.getStartKey()),
-                            (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
-                            (sources[i].term == null) ? nullText : sources[i].term);
-                }
-
-                sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
-            } else {
-                sources[i].iter.seek(range, seekColumnFamilies, inclusive);
-            }
+        
+        if (log.isDebugEnabled()) {
+          log.debug("We went too far, update the currentDocID to be the location of where were seek'ed to");
         }
-
-        advanceToIntersection();
-
-        if (hasTop()) {
-            if (overallRange != null && !overallRange.contains(topKey)) {
-                topKey = null;
-                if (log.isDebugEnabled()) {
-                    log.debug("seek, topKey is outside of overall range: " + overallRange);
-                }
-            }
+        
+        currentDocID.set(docid);
+        advancedCursor = true;
+        break;
+      }
+      
+      // Set the term as currentTerm (in case we found this record on the first try)
+      currentTerm = getTerm(ts.iter.getTopKey());
+      
+      if (log.isDebugEnabled()) {
+        log.debug("currentTerm = " + currentTerm);
+      }
+      
+      // If we're negated, next() the first TermSource since we guaranteed it was not a NOT term
+      if (ts.notFlag) {
+        sources[0].iter.next();
+        advancedCursor = true;
+      }
+      
+      // If we got here, we have a match
+      break;
+    }
+    
+    return advancedCursor;
+  }
+  
+  public void next() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("In ModifiedIntersectingIterator.next()");
+    }
+    
+    if (currentRow == null) {
+      return;
+    }
+    
+    // precondition: the current row is set up and the sources all have the same column qualifier
+    // while we don't have a match, seek in the source with the smallest column qualifier
+    sources[0].iter.next();
+    
+    advanceToIntersection();
+    
+    if (hasTop()) {
+      if (overallRange != null && !overallRange.contains(topKey)) {
+        topKey = null;
+      }
+    }
+  }
+  
+  protected void advanceToIntersection() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("In AndIterator.advanceToIntersection()");
+    }
+    
+    boolean cursorChanged = true;
+    int numSeeks = 0;
+    while (cursorChanged) {
+      // seek all of the sources to at least the highest seen column qualifier in the current row
+      cursorChanged = false;
+      for (TermSource ts : sources) {
+        if (currentRow == null) {
+          topKey = null;
+          return;
         }
-    }
-
-    public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env,
-            Text term, boolean notFlag) {
-        addSource(source, env, null, term, notFlag);
-    }
-
-    public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env,
-            Text dataLocation, Text term, boolean notFlag) {
-        // Check if we have space for the added Source
-        if (sources == null) {
-            sources = new TermSource[1];
-        } else {
-            // allocate space for node, and copy current tree.
-            // TODO:  Should we change this to an ArrayList so that we can just add() ?
-            TermSource[] localSources = new TermSource[sources.length + 1];
-            int currSource = 0;
-            for (TermSource myTerm : sources) {
-                // TODO:  Do I need to call new here? or can I just re-use the term?
-                localSources[currSource] = new TermSource(myTerm);
-                currSource++;
-            }
-            sources = localSources;
+        numSeeks++;
+        if (seekOneSource(ts)) {
+          cursorChanged = true;
+          break;
+        }
+      }
+    }
+    
+    topKey = buildKey(currentRow, currentTerm, currentDocID);
+    
+    if (log.isDebugEnabled()) {
+      log.debug("ModifiedIntersectingIterator: Got a match: " + topKey);
+    }
+  }
+  
+  public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
+    if (iter.hasTop()) {
+      return iter.getTopKey().toString();
+    }
+    return "";
+  }
+  
+  public static final String columnFamiliesOptionName = "columnFamilies";
+  public static final String termValuesOptionName = "termValues";
+  public static final String notFlagsOptionName = "notFlags";
+  
+  /**
+   * Encode a <code>Text</code> array of all the columns to intersect on
+   * 
+   * @param columns
+   *          The columns to be encoded
+   * @return
+   */
+  public static String encodeColumns(Text[] columns) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < columns.length; i++) {
+      sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
+      sb.append('\n');
+    }
+    return sb.toString();
+  }
+  
+  /**
+   * Encode a <code>Text</code> array of all of the terms to intersect on. The terms should match the columns in a one-to-one manner
+   * 
+   * @param terms
+   *          The terms to be encoded
+   * @return
+   */
+  public static String encodeTermValues(Text[] terms) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < terms.length; i++) {
+      sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(terms[i]))));
+      sb.append('\n');
+    }
+    
+    return sb.toString();
+  }
+  
+  /**
+   * Encode an array of <code>booleans</code> denoted which columns are NOT'ed
+   * 
+   * @param flags
+   *          The array of NOTs
+   * @return
+   */
+  public static String encodeBooleans(boolean[] flags) {
+    byte[] bytes = new byte[flags.length];
+    for (int i = 0; i < flags.length; i++) {
+      if (flags[i]) {
+        bytes[i] = 1;
+      } else {
+        bytes[i] = 0;
+      }
+    }
+    return new String(Base64.encodeBase64(bytes));
+  }
+  
+  /**
+   * Decode the encoded columns into a <code>Text</code> array
+   * 
+   * @param columns
+   *          The Base64 encoded String of the columns
+   * @return
+   */
+  public static Text[] decodeColumns(String columns) {
+    String[] columnStrings = columns.split("\n");
+    Text[] columnTexts = new Text[columnStrings.length];
+    for (int i = 0; i < columnStrings.length; i++) {
+      columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
+    }
+    
+    return columnTexts;
+  }
+  
+  /**
+   * Decode the encoded terms into a <code>Text</code> array
+   * 
+   * @param terms
+   *          The Base64 encoded String of the terms
+   * @return
+   */
+  public static Text[] decodeTermValues(String terms) {
+    String[] termStrings = terms.split("\n");
+    Text[] termTexts = new Text[termStrings.length];
+    for (int i = 0; i < termStrings.length; i++) {
+      termTexts[i] = new Text(Base64.decodeBase64(termStrings[i].getBytes()));
+    }
+    
+    return termTexts;
+  }
+  
+  /**
+   * Decode the encoded NOT flags into a <code>boolean</code> array
+   * 
+   * @param flags
+   * @return
+   */
+  public static boolean[] decodeBooleans(String flags) {
+    // return null of there were no flags
+    if (flags == null) {
+      return null;
+    }
+    
+    byte[] bytes = Base64.decodeBase64(flags.getBytes());
+    boolean[] bFlags = new boolean[bytes.length];
+    for (int i = 0; i < bytes.length; i++) {
+      if (bytes[i] == 1) {
+        bFlags[i] = true;
+      } else {
+        bFlags[i] = false;
+      }
+    }
+    
+    return bFlags;
+  }
+  
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("In AndIterator.init()");
+    }
+    
+    Text[] dataLocations = decodeColumns(options.get(columnFamiliesOptionName));
+    Text[] terms = decodeTermValues(options.get(termValuesOptionName));
+    boolean[] notFlags = decodeBooleans(options.get(notFlagsOptionName));
+    
+    if (terms.length < 2) {
+      throw new IllegalArgumentException("AndIterator requires two or more columns families");
+    }
+    
+    // Scan the not flags.
+    // There must be at least one term that isn't negated
+    // And we are going to re-order such that the first term is not a ! term
+    if (notFlags == null) {
+      notFlags = new boolean[terms.length];
+      for (int i = 0; i < terms.length; i++) {
+        notFlags[i] = false;
+      }
+    }
+    
+    // Make sure that the first dataLocation/Term is not a NOT by swapping it with a later dataLocation/Term
+    if (notFlags[0]) {
+      for (int i = 1; i < notFlags.length; i++) {
+        if (notFlags[i] == false) {
+          // Swap the terms
+          Text swap = new Text(terms[0]);
+          terms[0].set(terms[i]);
+          terms[i].set(swap);
+          
+          // Swap the dataLocations
+          swap.set(dataLocations[0]);
+          dataLocations[0].set(dataLocations[i]);
+          dataLocations[i].set(swap);
+          
+          // Flip the notFlags
+          notFlags[0] = false;
+          notFlags[i] = true;
+          break;
+        }
+      }
+      
+      if (notFlags[0]) {
+        throw new IllegalArgumentException("AndIterator requires at least one column family without not");
+      }
+    }
+    
+    // Build up the array of sources that are to be intersected
+    sources = new TermSource[dataLocations.length];
+    sources[0] = new TermSource(source, dataLocations[0], terms[0]);
+    for (int i = 1; i < dataLocations.length; i++) {
+      sources[i] = new TermSource(source.deepCopy(env), dataLocations[i], terms[i], notFlags[i]);
+    }
+    
+    sourcesCount = dataLocations.length;
+  }
+  
+  public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("In AndIterator.seek()");
+      log.debug("AndIterator.seek Given range => " + range);
+    }
+    // if (firstSeek) {
+    overallRange = new Range(range);
+    // firstSeek = false;
+    // }
+    if (range.getEndKey() != null && range.getEndKey().getRow() != null) {
+      this.parentEndRow = range.getEndKey().getRow();
+    }
+    
+    // overallRange = new Range(range);
+    currentRow = new Text();
+    currentDocID.set(emptyByteArray);
+    
+    this.seekColumnFamilies = seekColumnFamilies;
+    this.inclusive = inclusive;
+    
+    // seek each of the sources to the right column family within the row given by key
+    for (int i = 0; i < sourcesCount; i++) {
+      Key sourceKey;
+      if (range.getStartKey() != null) {
+        // Build a key with the DocID if one is given
+        if (range.getStartKey().getColumnQualifier() != null) {
+          sourceKey = buildKey(getPartition(range.getStartKey()), (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
+              (sources[i].term == null) ? nullText : new Text(sources[i].term + "\0" + range.getStartKey().getColumnQualifier()));
+        } // Build a key with just the term.
+        else {
+          sourceKey = buildKey(getPartition(range.getStartKey()), (sources[i].dataLocation == null) ? nullText : sources[i].dataLocation,
+              (sources[i].term == null) ? nullText : sources[i].term);
+        }
+        
+        sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
+      } else {
+        sources[i].iter.seek(range, seekColumnFamilies, inclusive);
+      }
+    }
+    
+    advanceToIntersection();
+    
+    if (hasTop()) {
+      if (overallRange != null && !overallRange.contains(topKey)) {
+        topKey = null;
+        if (log.isDebugEnabled()) {
+          log.debug("seek, topKey is outside of overall range: " + overallRange);
         }
-
-        sources[sourcesCount] = new TermSource(source.deepCopy(env), dataLocation, term, notFlag);
-        sourcesCount++;
+      }
     }
-
-    public boolean jump(Key jumpKey) throws IOException {
+  }
+  
+  public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
+    addSource(source, env, null, term, notFlag);
+  }
+  
+  public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text dataLocation, Text term, boolean notFlag) {
+    // Check if we have space for the added Source
+    if (sources == null) {
+      sources = new TermSource[1];
+    } else {
+      // allocate space for node, and copy current tree.
+      // TODO: Should we change this to an ArrayList so that we can just add() ?
+      TermSource[] localSources = new TermSource[sources.length + 1];
+      int currSource = 0;
+      for (TermSource myTerm : sources) {
+        // TODO: Do I need to call new here? or can I just re-use the term?
+        localSources[currSource] = new TermSource(myTerm);
+        currSource++;
+      }
+      sources = localSources;
+    }
+    
+    sources[sourcesCount] = new TermSource(source.deepCopy(env), dataLocation, term, notFlag);
+    sourcesCount++;
+  }
+  
+  public boolean jump(Key jumpKey) throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("jump: " + jumpKey);
+    }
+    
+    // is the jumpKey outside my overall range?
+    if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
+      // can't go there.
+      if (log.isDebugEnabled()) {
+        log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
+      }
+      return false;
+    }
+    
+    if (!hasTop()) {
+      // TODO: will need to add current/last row if you want to measure if
+      // we don't have topkey because we hit end of tablet.
+      
+      if (log.isDebugEnabled()) {
+        log.debug("jump called, but topKey is null, must need to move to next row");
+      }
+      // call seek with the jumpKey
+      
+      Key endKey = null;
+      if (parentEndRow != null) {
+        endKey = new Key(parentEndRow);
+      }
+      Range newRange = new Range(jumpKey, true, endKey, false);
+      this.seek(newRange, seekColumnFamilies, false);
+      // the parent seek should account for the endKey range check.
+      return hasTop();
+    } else {
+      
+      int comp = this.topKey.getRow().compareTo(jumpKey.getRow());
+      // compare rows
+      if (comp > 0) {
         if (log.isDebugEnabled()) {
-            log.debug("jump: " + jumpKey);
+          log.debug("jump, our row is ahead of jumpKey.");
+          log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
         }
-
-        // is the jumpKey outside my overall range?
-        if (parentEndRow != null && parentEndRow.compareTo(jumpKey.getRow()) < 0) {
-            //can't go there.
-            if (log.isDebugEnabled()) {
-                log.debug("jumpRow: " + jumpKey.getRow() + " is greater than my parentEndRow: " + parentEndRow);
-            }
-            return false;
+        return hasTop(); // do nothing, we're ahead of jumpKey row
+      } else if (comp < 0) { // a row behind jump key, need to move forward
+      
+        if (log.isDebugEnabled()) {
+          log.debug("II jump, row jump");
         }
-
-        if (!hasTop()) {
-            //TODO: will need to add current/last row if you want to measure if
-            //we don't have topkey because we hit end of tablet.
-
-            if (log.isDebugEnabled()) {
-                log.debug("jump called, but topKey is null, must need to move to next row");
-            }
-            // call seek with the jumpKey
-
-            Key endKey = null;
-            if (parentEndRow != null) {
-                endKey = new Key(parentEndRow);
-            }
-            Range newRange = new Range(jumpKey, true, endKey, false);
-            this.seek(newRange, seekColumnFamilies, false);
-            // the parent seek should account for the endKey range check.
-            return hasTop();
-        } else {
-
-            int comp = this.topKey.getRow().compareTo(jumpKey.getRow());
-            //compare rows
-            if (comp > 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("jump, our row is ahead of jumpKey.");
-                    log.debug("jumpRow: " + jumpKey.getRow() + " myRow: " + topKey.getRow() + " parentEndRow" + parentEndRow);
-                }
-                return hasTop();  // do nothing, we're ahead of jumpKey row
-            } else if (comp < 0) { //a row behind jump key, need to move forward
-
-                if (log.isDebugEnabled()) {
-                    log.debug("II jump, row jump");
-                }
-                Key endKey = null;
-                if (parentEndRow != null) {
-                    endKey = new Key(parentEndRow);
-                }
-                Key sKey = new Key(jumpKey.getRow());
-                Range fake = new Range(sKey, true, endKey, false);
-                this.seek(fake, this.seekColumnFamilies, false);
-                return hasTop();
-            } else {
-                //need to check uid
-                String myUid = getUID(this.topKey);
-                String jumpUid = getUID(jumpKey);
-                if (log.isDebugEnabled()) {
-                    if (myUid == null) {
-                        log.debug("myUid is null");
-                    } else {
-                        log.debug("myUid: " + myUid);
-                    }
-
-                    if (jumpUid == null) {
-                        log.debug("jumpUid is null");
-                    } else {
-                        log.debug("jumpUid: " + jumpUid);
-                    }
-                }
-
-                int ucomp = myUid.compareTo(jumpUid);
-                if (ucomp < 0) { //need to move all sources forward
-                    if (log.isDebugEnabled()) {
-                        log.debug("jump, uid jump");
-                    }
-                    // move one, and then advanceToIntersection will move the rest.
-                    Text row = jumpKey.getRow();
-                    String cq = topKey.getColumnQualifier().toString();
-                    cq = cq.replaceAll(myUid, jumpUid);
-
-                    Key startKey = buildKey(row, topKey.getColumnFamily(), new Text(cq));
-                    Range range = new Range(startKey, true, null, false);
-                    sources[0].iter.seek(range, seekColumnFamilies, true);
-                    advanceToIntersection();
-
-                    // make sure it is in the range if we have one.
-                    if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
-                        topKey = null;
-                    }
-                    if (log.isDebugEnabled() && hasTop()) {
-                        log.debug("jump, topKey is now: " + topKey);
-                    }
-                    return hasTop();
-
-
-                }//else do nothing
-                if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
-                    topKey = null;
-                }
-                return hasTop();
-            }
+        Key endKey = null;
+        if (parentEndRow != null) {
+          endKey = new Key(parentEndRow);
+        }
+        Key sKey = new Key(jumpKey.getRow());
+        Range fake = new Range(sKey, true, endKey, false);
+        this.seek(fake, this.seekColumnFamilies, false);
+        return hasTop();
+      } else {
+        // need to check uid
+        String myUid = getUID(this.topKey);
+        String jumpUid = getUID(jumpKey);
+        if (log.isDebugEnabled()) {
+          if (myUid == null) {
+            log.debug("myUid is null");
+          } else {
+            log.debug("myUid: " + myUid);
+          }
+          
+          if (jumpUid == null) {
+            log.debug("jumpUid is null");
+          } else {
+            log.debug("jumpUid: " + jumpUid);
+          }
+        }
+        
+        int ucomp = myUid.compareTo(jumpUid);
+        if (ucomp < 0) { // need to move all sources forward
+          if (log.isDebugEnabled()) {
+            log.debug("jump, uid jump");
+          }
+          // move one, and then advanceToIntersection will move the rest.
+          Text row = jumpKey.getRow();
+          String cq = topKey.getColumnQualifier().toString();
+          cq = cq.replaceAll(myUid, jumpUid);
+          
+          Key startKey = buildKey(row, topKey.getColumnFamily(), new Text(cq));
+          Range range = new Range(startKey, true, null, false);
+          sources[0].iter.seek(range, seekColumnFamilies, true);
+          advanceToIntersection();
+          
+          // make sure it is in the range if we have one.
+          if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
+            topKey = null;
+          }
+          if (log.isDebugEnabled() && hasTop()) {
+            log.debug("jump, topKey is now: " + topKey);
+          }
+          return hasTop();
+          
+        }// else do nothing
+        if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) {
+          topKey = null;
         }
+        return hasTop();
+      }
     }
+  }
 }



Mime
View raw message