accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1434955 - in /accumulo/trunk/core/src: main/java/org/apache/accumulo/core/iterators/user/ test/java/org/apache/accumulo/core/iterators/user/
Date Thu, 17 Jan 2013 23:17:20 GMT
Author: kturner
Date: Thu Jan 17 23:17:20 2013
New Revision: 1434955

URL: http://svn.apache.org/viewvc?rev=1434955&view=rev
Log:
ACCUMULO-956 generlaized transforming iterator, added some sanity checks to it, added some
more unit test, added some static config methods

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
      - copied, changed from r1434936, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
      - copied, changed from r1434936, accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
Removed:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java

Copied: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
(from r1434936, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java)
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java?p2=accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java&p1=accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java&r1=1434936&r2=1434955&rev=1434955&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
Thu Jan 17 23:17:20 2013
@@ -23,7 +23,10 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -40,12 +43,13 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.VisibilityParseException;
 import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.collections.BufferOverflowException;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 /**
- * The KeyTransformingIterator allows portions of a key (except for the row) to be transformed.
This iterator handles the details that come with modifying keys
+ * The TransformingIterator allows portions of a key (except for the row) to be transformed.
This iterator handles the details that come with modifying keys
  * (i.e., that the sort order could change). In order to do so, however, the iterator must
put all keys sharing the same prefix in memory. Prefix is defined as
  * the parts of the key that are not modified by this iterator. That is, if the iterator
modifies column qualifier and timestamp, then the prefix is row and
  * column family. In that case, the iterator must load all column qualifiers for each row/column
family pair into memory. Given this constraint, care must be
@@ -70,8 +74,11 @@ import org.apache.log4j.Logger;
  * major and minor compactions. It should also be noted that this iterator implements the
security filtering rather than relying on a follow-on iterator to do
  * it so that we ensure the test is performed.
  */
-abstract public class KeyTransformingIterator extends WrappingIterator implements OptionDescriber
{
+abstract public class TransformingIterator extends WrappingIterator implements OptionDescriber
{
   public static final String AUTH_OPT = "authorizations";
+  public static final String MAX_BUFFER_SIZE_OPT = "maxBufferSize";
+  private static final long DEFAULT_MAX_BUFFER_SIZE = 10000000;
+
   protected Logger log = Logger.getLogger(getClass());
   
   protected ArrayList<Pair<Key,Value>> keys = new ArrayList<Pair<Key,Value>>();
@@ -84,6 +91,7 @@ abstract public class KeyTransformingIte
   private VisibilityEvaluator ve = null;
   private LRUMap visibleCache = null;
   private LRUMap parsedVisibilitiesCache = null;
+  private long maxBufferSize;
   
   private static Comparator<Pair<Key,Value>> keyComparator = new Comparator<Pair<Key,Value>>()
{
     @Override
@@ -92,7 +100,7 @@ abstract public class KeyTransformingIte
     }
   };
   
-  public KeyTransformingIterator() {}
+  public TransformingIterator() {}
   
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
@@ -101,11 +109,17 @@ abstract public class KeyTransformingIte
     if (scanning) {
       String auths = options.get(AUTH_OPT);
       if (auths != null && !auths.isEmpty()) {
-        ve = new VisibilityEvaluator(new Authorizations(auths.split(",")));
+        ve = new VisibilityEvaluator(new Authorizations(auths.getBytes()));
         visibleCache = new LRUMap(100);
       }
     }
     
+    if (options.containsKey(MAX_BUFFER_SIZE_OPT)) {
+      maxBufferSize = AccumuloConfiguration.getMemoryInBytes(options.get(MAX_BUFFER_SIZE_OPT));
+    } else {
+      maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
+    }
+
     parsedVisibilitiesCache = new LRUMap(100);
   }
   
@@ -124,7 +138,7 @@ abstract public class KeyTransformingIte
   
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    KeyTransformingIterator copy;
+    TransformingIterator copy;
     
     try {
       copy = getClass().newInstance();
@@ -208,7 +222,60 @@ abstract public class KeyTransformingIte
       transformKeys();
     }
   }
-  
+
+  private class RangeIterator implements SortedKeyValueIterator<Key,Value> {
+    
+    private SortedKeyValueIterator<Key,Value> source;
+    private Key prefixKey;
+    private boolean hasTop = false;
+    
+    RangeIterator(SortedKeyValueIterator<Key,Value> source, Key prefixKey) {
+      this.source = source;
+      this.prefixKey = prefixKey;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean hasTop() {
+      // only have a top if the prefix matches
+      return hasTop = source.hasTop() && source.getTopKey().equals(prefixKey, getKeyPrefix());
+    }
+    
+    @Override
+    public void next() throws IOException {
+      // do not let user advance too far and try to avoid reexecuting hasTop()
+      if (!hasTop && !hasTop())
+        throw new NoSuchElementException();
+      hasTop = false;
+      source.next();
+    }
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean
inclusive) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Key getTopKey() {
+      return source.getTopKey();
+    }
+    
+    @Override
+    public Value getTopValue() {
+      return source.getTopValue();
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+    
+  }
+
   /**
    * Reads all keys matching the first key's prefix from the source iterator, transforms
them, and sorts the resulting keys. Transformed keys that fall outside
    * of our seek range or can't be seen by the user are excluded.
@@ -216,32 +283,37 @@ abstract public class KeyTransformingIte
   protected void transformKeys() throws IOException {
     keyPos = -1;
     keys.clear();
-    Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null;
+    final Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null;
     
-    while (super.hasTop()) {
-      Key sourceTopKey = super.getTopKey();
+    transformRange(new RangeIterator(getSource(), prefixKey), new KVBuffer() {
       
-      // If the source key equals our prefix key (up to the prefix), then
-      // we have a key that needs transformed. Otherwise, we're done.
-      if (sourceTopKey.equals(prefixKey, getKeyPrefix())) {
-        Key transformedKey = transformKey(sourceTopKey);
-        
-        // If the transformed key didn't actually change, then we need
-        // to make a copy since we're caching it in our keys list.
-        if (transformedKey == sourceTopKey)
-          transformedKey = new Key(sourceTopKey);
-        // We could check that the transformed key didn't transform anything
-        // in the key prefix here...
+      long appened = 0;
+
+      @Override
+      public void append(Key key, Value val) {
+        // ensure the key provided by the user has the correct prefix
+        if (!key.equals(prefixKey, getKeyPrefix()))
+          throw new IllegalArgumentException("Key prefixes are not equal " + key + " " +
prefixKey);
         
         // Transformation could have produced a key that falls outside
         // of the seek range, or one that the user cannot see. Check
         // these before adding it to the output list.
-        if (includeTransformedKey(transformedKey))
-          keys.add(new Pair<Key,Value>(transformedKey, new Value(super.getTopValue())));
-      } else {
-        break;
+        if (includeTransformedKey(key)) {
+          
+          // try to defend against a scan or compaction using all memory in a tablet server
+          if (appened > maxBufferSize)
+            throw new BufferOverflowException("Exceeded buffer size of " + maxBufferSize
+ ", prefixKey: " + prefixKey);
+
+          if (getSource().hasTop() && key == getSource().getTopKey())
+            key = new Key(key);
+          keys.add(new Pair<Key,Value>(key, new Value(val)));
+          appened += (key.getSize() + val.getSize() + 128);
+        }
       }
-      
+    });
+
+    // consume any key in range that user did not consume
+    while (super.hasTop() && super.getTopKey().equals(prefixKey, getKeyPrefix()))
{
       super.next();
     }
     
@@ -531,19 +603,49 @@ abstract public class KeyTransformingIte
    */
   abstract protected PartialKey getKeyPrefix();
   
+  public static interface KVBuffer {
+    void append(Key key, Value val);
+  }
+  
   /**
    * Transforms {@code originalKey}. This method must not change the row part of the key,
and must only change the parts of the key after the return value of
    * {@link #getKeyPrefix()}. Implementors must also remember to copy the delete flag from
{@code originalKey} onto the new key. Or, implementors should use one
    * of the helper methods to produce the new key. See any of the replaceKeyParts methods.
    * 
-   * @param originalKey
-   *          the key to be transformed
-   * @return the modified key
+   * @param input
+   *          An iterator over a group of keys with the same prefix. This iterator provides
an efficient view, bounded by the prefix, of the underlying iterator
+   *          and can not be seeked.
+   * @param output
+   *          An output buffer that holds transformed key values. All key values added to
the buffer must have the same prefix as the input keys.
+   * @throws IOException
    * @see #replaceColumnFamily(Key, Text)
    * @see #replaceColumnQualifier(Key, Text)
    * @see #replaceColumnVisibility(Key, Text)
    * @see #replaceKeyParts(Key, Text, Text)
    * @see #replaceKeyParts(Key, Text, Text, Text)
    */
-  abstract protected Key transformKey(Key originalKey);
+  
+  abstract protected void transformRange(SortedKeyValueIterator<Key,Value> input, KVBuffer
output) throws IOException;
+  
+  /**
+   * Configure authoriations used for post transformation filtering.
+   * 
+   * @param config
+   * @param auths
+   */
+  public static void setAutorizations(IteratorSetting config, Authorizations auths) {
+    config.addOption(AUTH_OPT, auths.serialize());
+  }
+  
+  /**
+   * Configure the maximum amount of memory that can be used for transformation. If this
memory is exceeded an exception will be thrown.
+   * 
+   * @param config
+   * @param maxBufferSize
+   *          size in bytes
+   */
+  public static void setMaxBufferSize(IteratorSetting config, long maxBufferSize) {
+    config.addOption(MAX_BUFFER_SIZE_OPT, maxBufferSize + "");
+  }
+
 }

Copied: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
(from r1434936, accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java)
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java?p2=accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java&p1=accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java&r1=1434936&r2=1434955&rev=1434955&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
(original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
Thu Jan 17 23:17:20 2013
@@ -58,7 +58,7 @@ import org.apache.hadoop.io.Text;
 import org.junit.Before;
 import org.junit.Test;
 
-public class KeyTransformingIteratorTest {
+public class TransformingIteratorTest {
   private static final String TABLE_NAME = "test_table";
   private static Authorizations authorizations = new Authorizations("vis0", "vis1", "vis2",
"vis3", "vis4");
   private Connector connector;
@@ -88,10 +88,10 @@ public class KeyTransformingIteratorTest
     scanner.addScanIterator(new IteratorSetting(20, ReuseIterator.class));
   }
   
-  private void setUpTransformIterator(Class<? extends KeyTransformingIterator> clazz)
{
+  private void setUpTransformIterator(Class<? extends TransformingIterator> clazz)
{
     IteratorSetting cfg = new IteratorSetting(21, clazz);
     cfg.setName("keyTransformIter");
-    cfg.addOption(KeyTransformingIterator.AUTH_OPT, "vis0, vis1, vis2, vis3");
+    TransformingIterator.setAutorizations(cfg, new Authorizations("vis0", "vis1", "vis2",
"vis3"));
     scanner.addScanIterator(cfg);
   }
   
@@ -129,7 +129,7 @@ public class KeyTransformingIteratorTest
       setUpTransformIterator(clazz);
       
       // All rows with visibilities reversed
-      KeyTransformingIterator iter = clazz.newInstance();
+      TransformingIterator iter = clazz.newInstance();
       TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
       for (int row = 1; row <= 3; ++row) {
         for (int cf = 1; cf <= 3; ++cf) {
@@ -235,7 +235,7 @@ public class KeyTransformingIteratorTest
   
   @Test
   public void testReplaceKeyParts() throws Exception {
-    KeyTransformingIterator it = new IdentityKeyTransformingIterator();
+    TransformingIterator it = new IdentityKeyTransformingIterator();
     Key originalKey = new Key("r", "cf", "cq", "cv", 42);
     originalKey.setDeleted(true);
     
@@ -485,25 +485,32 @@ public class KeyTransformingIteratorTest
     return new Key(row, cf, cq, cv, ts);
   }
   
-  public static class IdentityKeyTransformingIterator extends KeyTransformingIterator {
+  public static class IdentityKeyTransformingIterator extends TransformingIterator {
     @Override
     protected PartialKey getKeyPrefix() {
       return PartialKey.ROW;
     }
     
     @Override
-    protected Key transformKey(Key originalKey) {
-      return originalKey;
-    };
+    protected void transformRange(SortedKeyValueIterator<Key,Value> input, KVBuffer
output) throws IOException {
+      while (input.hasTop()) {
+        output.append(input.getTopKey(), input.getTopValue());
+        input.next();
+      }
+    }
   }
   
-  public static class DupeTransformingIterator extends KeyTransformingIterator {
-    @Override
-    protected Key transformKey(Key originalKey) {
-      Key ret = replaceKeyParts(originalKey, new Text("cf1"), new Text("cq1"), new Text(""));
-      ret.setTimestamp(5);
-      return ret;
-    };
+  public static class DupeTransformingIterator extends TransformingIterator {
+    @Override
+    protected void transformRange(SortedKeyValueIterator<Key,Value> input, KVBuffer
output) throws IOException {
+      while (input.hasTop()) {
+        Key originalKey = input.getTopKey();
+        Key ret = replaceKeyParts(originalKey, new Text("cf1"), new Text("cq1"), new Text(""));
+        ret.setTimestamp(5);
+        output.append(ret, input.getTopValue());
+        input.next();
+      }
+    }
     
     @Override
     protected PartialKey getKeyPrefix() {
@@ -512,11 +519,16 @@ public class KeyTransformingIteratorTest
     
   }
 
-  public static abstract class ReversingKeyTransformingIterator extends KeyTransformingIterator
{
+  public static abstract class ReversingKeyTransformingIterator extends TransformingIterator
{
+    
     @Override
-    protected Key transformKey(Key originalKey) {
-      return reverseKeyPart(originalKey, getKeyPrefix());
-    };
+    protected void transformRange(SortedKeyValueIterator<Key,Value> input, KVBuffer
output) throws IOException {
+      while (input.hasTop()) {
+        Key originalKey = input.getTopKey();
+        output.append(reverseKeyPart(originalKey, getKeyPrefix()), input.getTopValue());
+        input.next();
+      }
+    }
   }
   
   public static class ColFamReversingKeyTransformingIterator extends ReversingKeyTransformingIterator
{
@@ -562,15 +574,21 @@ public class KeyTransformingIteratorTest
     }
   }
   
-  public static class IllegalVisKeyTransformingIterator extends KeyTransformingIterator {
+  public static class IllegalVisKeyTransformingIterator extends TransformingIterator {
     @Override
     protected PartialKey getKeyPrefix() {
       return PartialKey.ROW_COLFAM_COLQUAL;
     }
-    
+
     @Override
-    protected Key transformKey(Key originalKey) {
-      return new Key(originalKey.getRow(), originalKey.getColumnFamily(), originalKey.getColumnQualifier(),
new Text("A&|||"), originalKey.getTimestamp());
+    protected void transformRange(SortedKeyValueIterator<Key,Value> input, KVBuffer
output) throws IOException {
+      while (input.hasTop()) {
+        Key originalKey = input.getTopKey();
+        output.append(
+            new Key(originalKey.getRow(), originalKey.getColumnFamily(), originalKey.getColumnQualifier(),
new Text("A&|||"), originalKey.getTimestamp()),
+            input.getTopValue());
+        input.next();
+      }
     }
   }
 
@@ -582,15 +600,21 @@ public class KeyTransformingIteratorTest
     }
   }
 
-  public static class BadVisKeyTransformingIterator extends KeyTransformingIterator {
+  public static class BadVisKeyTransformingIterator extends TransformingIterator {
     @Override
     protected PartialKey getKeyPrefix() {
       return PartialKey.ROW_COLFAM_COLQUAL;
     }
     
     @Override
-    protected Key transformKey(Key originalKey) {
-      return new Key(originalKey.getRow(), originalKey.getColumnFamily(), originalKey.getColumnQualifier(),
new Text("badvis"), originalKey.getTimestamp());
+    protected void transformRange(SortedKeyValueIterator<Key,Value> input, KVBuffer
output) throws IOException {
+      while (input.hasTop()) {
+        Key originalKey = input.getTopKey();
+        output.append(
+            new Key(originalKey.getRow(), originalKey.getColumnFamily(), originalKey.getColumnQualifier(),
new Text("badvis"), originalKey.getTimestamp()),
+            input.getTopValue());
+        input.next();
+      }
     }
   }
   



Mime
View raw message