incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1245201 - in /incubator/accumulo/branches/1.4/src/core/src: main/java/org/apache/accumulo/core/iterators/user/ test/java/org/apache/accumulo/core/iterators/user/
Date Thu, 16 Feb 2012 21:58:39 GMT
Author: kturner
Date: Thu Feb 16 21:58:38 2012
New Revision: 1245201

URL: http://svn.apache.org/viewvc?rev=1245201&view=rev
Log:
ACCUMULO-403 Initial checkin of RowFilter

Added:
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
    incubator/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/iterators/user/RowFilterTest.java
Modified:
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java

Added: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java?rev=1245201&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
(added)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
Thu Feb 16 21:58:38 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This iterator makes it easy to select rows that meet a given criteria. Its an alternative
to the {@link WholeRowIterator}. There are a few things to consider
+ * when deciding which one to use.
+ * 
+ * First the WholeRowIterator requires that the row fit in memory and that the entire row
is read before a decision is made. This iterator has neither
+ * requirement, it allows seeking within a row to avoid reading the entire row to make a
decision. So even if your rows fit into memory, this extending this
+ * iterator may be better choice because you can seek.
+ * 
+ * Second the WholeRowIterator is currently the only way to achieve row isolation with the
{@link BatchScanner}. With the normal {@link Scanner} row isolation
+ * can be enabled and this Iterator may be used.
+ * 
+ * Third the row acceptance test will be executed every time this Iterator is seeked. If
the row is large, then the row will fetched in batches of key/values.
+ * As each batch is fetched the test may be re-executed because the iterator stack is reseeked
for each batch. The batch size may be increased to reduce the
+ * number of times the test is executed. With the normal Scanner, if isolation is enabled
then it will read an entire row w/o seeking this iterator.
+ * 
+ */
+public abstract class RowFilter extends WrappingIterator {
+  
+  private SortedKeyValueIterator<Key,Value> decisionIterator;
+  private Collection<ByteSequence> columnFamilies;
+  Text currentRow;
+  private boolean inclusive;
+  private Range range;
+  private boolean hasTop;
+
+  private void skipRows() throws IOException {
+    SortedKeyValueIterator<Key,Value> source = getSource();
+    while (source.hasTop()) {
+      Text row = source.getTopKey().getRow();
+      
+      if (currentRow != null && currentRow.equals(row))
+        break;
+      
+      decisionIterator.seek(new Range(row), columnFamilies, inclusive);
+      
+      if (acceptRow(decisionIterator)) {
+        currentRow = row;
+        break;
+      } else {
+        currentRow = null;
+        int count = 0;
+        while (source.hasTop() && count < 10 && source.getTopKey().getRow().equals(row))
{
+          count++;
+          source.next();
+        }
+        
+        if (source.hasTop() && source.getTopKey().getRow().equals(row)) {
+          Range nextRow = new Range(row, false, null, false);
+          nextRow = range.clip(nextRow, true);
+          if (nextRow == null)
+            hasTop = false;
+          else
+            source.seek(nextRow, columnFamilies, inclusive);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Implementation should return false to suppress a row.
+   * 
+   * 
+   * @param rowIterator
+   *          - An iterator over the row.
+   * @return
+   * @throws IOException
+   */
+  public abstract boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator)
throws IOException;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    this.decisionIterator = source.deepCopy(env);
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return hasTop && super.hasTop();
+  }
+  
+  @Override
+  public void next() throws IOException {
+    super.next();
+    skipRows();
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
+    super.seek(range, columnFamilies, inclusive);
+    this.columnFamilies = columnFamilies;
+    this.inclusive = inclusive;
+    this.range = range;
+    currentRow = null;
+    hasTop = true;
+    skipRows();
+    
+  }
+}

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java?rev=1245201&r1=1245200&r2=1245201&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
Thu Feb 16 21:58:38 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.io.Text;
  * <p>
  * To regain the original key/value pairs of the row, call the decodeRow function on the
key/value pair that this iterator returned.
  * 
+ * @see RowFilter
  */
 public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> {
   

Added: incubator/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/iterators/user/RowFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/iterators/user/RowFilterTest.java?rev=1245201&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/iterators/user/RowFilterTest.java
(added)
+++ incubator/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/iterators/user/RowFilterTest.java
Thu Feb 16 21:58:38 2012
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ */
+
+public class RowFilterTest extends TestCase {
+  
+  public static class SummingRowFilter extends RowFilter {
+    
+    @Override
+    public boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) throws
IOException {
+      int sum = 0;
+      
+      while (rowIterator.hasTop()) {
+        sum += Integer.parseInt(rowIterator.getTopValue().toString());
+        rowIterator.next();
+      }
+      
+      return sum == 2;
+    }
+    
+  }
+
+  public void test1() throws Exception {
+    MockInstance instance = new MockInstance("rft1");
+    Connector conn = instance.getConnector("", "".getBytes());
+    
+    conn.tableOperations().create("table1");
+    BatchWriter bw = conn.createBatchWriter("table1", 1000000, 60000, 1);
+    
+    Mutation m = new Mutation("0");
+    m.put("cf1", "cq1", "1");
+    m.put("cf1", "cq2", "1");
+    m.put("cf1", "cq3", "1");
+    m.put("cf1", "cq4", "1");
+    m.put("cf1", "cq5", "1");
+    m.put("cf1", "cq6", "1");
+    m.put("cf1", "cq7", "1");
+    m.put("cf1", "cq8", "1");
+    m.put("cf1", "cq9", "1");
+    m.put("cf2", "cq1", "1");
+    m.put("cf2", "cq2", "1");
+    bw.addMutation(m);
+    
+    m = new Mutation("1");
+    m.put("cf1", "cq1", "1");
+    m.put("cf1", "cq2", "2");
+    bw.addMutation(m);
+    
+    m = new Mutation("2");
+    m.put("cf1", "cq1", "1");
+    m.put("cf1", "cq2", "1");
+    bw.addMutation(m);
+    
+    m = new Mutation("3");
+    m.put("cf1", "cq1", "0");
+    m.put("cf1", "cq2", "2");
+    bw.addMutation(m);
+    
+    m = new Mutation("4");
+    m.put("cf1", "cq1", "1");
+    m.put("cf1", "cq2", "1");
+    m.put("cf1", "cq3", "1");
+    m.put("cf1", "cq4", "1");
+    m.put("cf1", "cq5", "1");
+    m.put("cf1", "cq6", "1");
+    m.put("cf1", "cq7", "1");
+    m.put("cf1", "cq8", "1");
+    m.put("cf1", "cq9", "1");
+    m.put("cf2", "cq1", "1");
+    m.put("cf2", "cq2", "1");
+    bw.addMutation(m);
+
+    IteratorSetting is = new IteratorSetting(40, SummingRowFilter.class);
+    conn.tableOperations().attachIterator("table1", is);
+    
+    Scanner scanner = conn.createScanner("table1", Constants.NO_AUTHS);
+    assertEquals(new HashSet<String>(Arrays.asList("2", "3")), getRows(scanner));
+    
+    scanner.fetchColumn(new Text("cf1"), new Text("cq2"));
+    assertEquals(new HashSet<String>(Arrays.asList("1", "3")), getRows(scanner));
+    
+    scanner.clearColumns();
+    scanner.fetchColumn(new Text("cf1"), new Text("cq1"));
+    assertEquals(new HashSet<String>(), getRows(scanner));
+    
+    scanner.setRange(new Range("0", "4"));
+    scanner.clearColumns();
+    assertEquals(new HashSet<String>(Arrays.asList("2", "3")), getRows(scanner));
+    
+    scanner.setRange(new Range("2"));
+    scanner.clearColumns();
+    assertEquals(new HashSet<String>(Arrays.asList("2")), getRows(scanner));
+    
+    scanner.setRange(new Range("4"));
+    scanner.clearColumns();
+    assertEquals(new HashSet<String>(), getRows(scanner));
+    
+    scanner.setRange(new Range("4"));
+    scanner.clearColumns();
+    scanner.fetchColumn(new Text("cf1"), new Text("cq2"));
+    scanner.fetchColumn(new Text("cf1"), new Text("cq4"));
+    assertEquals(new HashSet<String>(Arrays.asList("4")), getRows(scanner));
+
+  }
+  
+  private HashSet<String> getRows(Scanner scanner) {
+    HashSet<String> rows = new HashSet<String>();
+    for (Entry<Key,Value> entry : scanner) {
+      rows.add(entry.getKey().getRow().toString());
+    }
+    return rows;
+  }
+}



Mime
View raw message