accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/3] accumulo git commit: ACCUMULO-3761 RowEncodingIterator should take a maximum buffer size parameter
Date Wed, 06 May 2015 21:07:05 GMT
ACCUMULO-3761 RowEncodingIterator should take a maximum buffer size parameter

Added the optional parameter maxBufferSize to the RowEncodingIterator. This parameter specifies
how large the RowEncodingIterator's buffer can grow to as it encounters key/value pairs to
be encoded. The name, encoding and behaviour of the maxBufferSize parameter match the TransformingIterator.

Discussion is here: http://www.mail-archive.com/dev%40accumulo.apache.org/msg09821.html

Added maxBufferSize parameter for RowEncodingIterator based on implementation of TransformingIterator;
still needs tests

Signed-off-by: Josh Elser <elserj@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfa5255c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfa5255c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfa5255c

Branch: refs/heads/master
Commit: dfa5255cee3219cfaf560e949386da23bd9135d6
Parents: 2938cfa
Author: Russ Weeks <rweeks@newbrightidea.com>
Authored: Fri Apr 10 00:10:32 2015 -0700
Committer: Josh Elser <elserj@apache.org>
Committed: Wed May 6 16:43:14 2015 -0400

----------------------------------------------------------------------
 .../iterators/user/RowEncodingIterator.java     |  56 +++++-
 .../core/iterators/user/WholeRowIterator.java   |   7 -
 .../iterators/user/RowEncodingIteratorTest.java | 201 +++++++++++++++++++
 3 files changed, 253 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
index f776569..8a36bef 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -22,14 +22,18 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
+import java.util.HashMap;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.commons.collections.BufferOverflowException;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -52,11 +56,15 @@ import org.apache.hadoop.io.Text;
  *
  * @see RowFilter
  */
-public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value>
{
+public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value>,
OptionDescriber {
+
+  public static final String MAX_BUFFER_SIZE_OPT = "maxBufferSize";
+  private static final long DEFAULT_MAX_BUFFER_SIZE = Long.MAX_VALUE;
 
   protected SortedKeyValueIterator<Key,Value> sourceIter;
   private Key topKey = null;
   private Value topValue = null;
+  private long maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
 
   // decode a bunch of key value pairs that have been encoded into a single value
   /**
@@ -71,12 +79,23 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
   public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws
IOException;
 
   @Override
-  public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    RowEncodingIterator newInstance;
+    try {
+      newInstance = this.getClass().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    newInstance.sourceIter = sourceIter.deepCopy(env);
+    newInstance.maxBufferSize = maxBufferSize;
+    return newInstance;
+  }
 
   List<Key> keys = new ArrayList<Key>();
   List<Value> values = new ArrayList<Value>();
 
   private void prepKeys() throws IOException {
+    long kvBufSize = 0;
     if (topKey != null)
       return;
     Text currentRow;
@@ -87,8 +106,14 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
       keys.clear();
       values.clear();
       while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow))
{
-        keys.add(new Key(sourceIter.getTopKey()));
-        values.add(new Value(sourceIter.getTopValue()));
+        Key sourceTopKey = sourceIter.getTopKey();
+        Value sourceTopValue = sourceIter.getTopValue();
+        keys.add(new Key(sourceTopKey));
+        values.add(new Value(sourceTopValue));
+        kvBufSize += sourceTopKey.getSize() + sourceTopValue.getSize() + 128;
+        if (kvBufSize > maxBufferSize) {
+          throw new BufferOverflowException("Exceeded buffer size of " + maxBufferSize +
" for row: " + sourceTopKey.getRow().toString());
+        }
         sourceIter.next();
       }
     } while (!filter(currentRow, keys, values));
@@ -129,6 +154,29 @@ public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
     sourceIter = source;
+    if (options.containsKey(MAX_BUFFER_SIZE_OPT)) {
+      maxBufferSize = AccumuloConfiguration.getMemoryInBytes(options.get(MAX_BUFFER_SIZE_OPT));
+    }
+  }
+
+  @Override
+  public IteratorOptions describeOptions() {
+    String desc = "This iterator encapsulates an entire row of Key/Value pairs into a single
Key/Value pair.";
+    String bufferDesc = "Maximum buffer size (in accumulo memory spec) to use for buffering
keys before throwing a BufferOverflowException.";
+    HashMap<String,String> namedOptions = new HashMap<String,String>();
+    namedOptions.put(MAX_BUFFER_SIZE_OPT, bufferDesc);
+    return new IteratorOptions(getClass().getSimpleName(), desc, namedOptions, null);
+  }
+
+  @Override
+  public boolean validateOptions(Map<String, String> options) {
+    String maxBufferSizeStr = options.get(MAX_BUFFER_SIZE_OPT);
+    try {
+      AccumuloConfiguration.getMemoryInBytes(maxBufferSizeStr);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Failed to parse opt " + MAX_BUFFER_SIZE_OPT + "
" + maxBufferSizeStr, e);
+    }
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index 7c47ec3..9f5a6b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -57,13 +57,6 @@ public class WholeRowIterator extends RowEncodingIterator {
   }
 
   @Override
-  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    if (sourceIter != null)
-      return new WholeRowIterator(sourceIter.deepCopy(env));
-    return new WholeRowIterator();
-  }
-
-  @Override
   public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException
{
     return decodeRow(rowKey, rowValue);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa5255c/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
new file mode 100644
index 0000000..8f228f5
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/RowEncodingIteratorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.collections.BufferOverflowException;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RowEncodingIteratorTest {
+
+  private static final class DummyIteratorEnv implements IteratorEnvironment {
+    @Override
+    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName)
throws IOException {
+      return null;
+    }
+
+    @Override
+    public AccumuloConfiguration getConfig() {
+      return null;
+    }
+
+    @Override
+    public IteratorUtil.IteratorScope getIteratorScope() {
+      return IteratorUtil.IteratorScope.scan;
+    }
+
+    @Override
+    public boolean isFullMajorCompaction() {
+      return false;
+    }
+
+    @Override
+    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+
+    }
+
+    @Override
+    public Authorizations getAuthorizations() {
+      return null;
+    }
+  }
+
+  private static final class RowEncodingIteratorImpl extends RowEncodingIterator {
+
+    public static SortedMap<Key,Value> decodeRow(Key rowKey, Value rowValue) throws
IOException {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(rowValue.get()));
+      int numKeys = dis.readInt();
+      List<Key> decodedKeys = new ArrayList<Key>();
+      List<Value> decodedValues = new ArrayList<Value>();
+      SortedMap<Key,Value> out = new TreeMap<Key,Value>();
+      for (int i = 0; i < numKeys; i++) {
+        Key k = new Key();
+        k.readFields(dis);
+        decodedKeys.add(k);
+      }
+      int numValues = dis.readInt();
+      for (int i = 0; i < numValues; i++) {
+        Value v = new Value();
+        v.readFields(dis);
+        decodedValues.add(v);
+      }
+      if (decodedKeys.size() != decodedValues.size()) {
+        throw new IOException("Number of keys doesn't match number of values");
+      }
+      for (int i = 0; i < decodedKeys.size(); i++) {
+        out.put(decodedKeys.get(i), decodedValues.get(i));
+      }
+      return out;
+    }
+
+    @Override
+    public SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException
{
+      return decodeRow(rowKey, rowValue);
+    }
+
+    @Override
+    public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException
{
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      dos.writeInt(keys.size());
+      for (Key key : keys) {
+        key.write(dos);
+      }
+      dos.writeInt(values.size());
+      for (Value v : values) {
+        v.write(dos);
+      }
+      dos.flush();
+      return new Value(baos.toByteArray());
+    }
+  }
+
+  private void pkv(SortedMap<Key,Value> map, String row, String cf, String cq, String
cv, long ts, byte[] val) {
+    map.put(new Key(new Text(row), new Text(cf), new Text(cq), new Text(cv), ts), new Value(val,
true));
+  }
+
+  @Test
+  public void testEncodeAll() throws IOException {
+    byte[] kbVal = new byte[1024];
+    // This code is shamelessly borrowed from the WholeRowIteratorTest.
+    SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+    pkv(map1, "row1", "cf1", "cq1", "cv1", 5, kbVal);
+    pkv(map1, "row1", "cf1", "cq2", "cv1", 6, kbVal);
+
+    SortedMap<Key,Value> map2 = new TreeMap<Key,Value>();
+    pkv(map2, "row2", "cf1", "cq1", "cv1", 5, kbVal);
+    pkv(map2, "row2", "cf1", "cq2", "cv1", 6, kbVal);
+
+    SortedMap<Key,Value> map3 = new TreeMap<Key,Value>();
+    pkv(map3, "row3", "cf1", "cq1", "cv1", 5, kbVal);
+    pkv(map3, "row3", "cf1", "cq2", "cv1", 6, kbVal);
+
+    SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+    map.putAll(map1);
+    map.putAll(map2);
+    map.putAll(map3);
+    SortedMapIterator src = new SortedMapIterator(map);
+    Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+    RowEncodingIteratorImpl iter = new RowEncodingIteratorImpl();
+    Map<String,String> bigBufferOpts = new HashMap<String,String>();
+    bigBufferOpts.put(RowEncodingIterator.MAX_BUFFER_SIZE_OPT, "3K");
+    iter.init(src, bigBufferOpts, new DummyIteratorEnv());
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+
+    assertTrue(iter.hasTop());
+    assertEquals(map1, RowEncodingIteratorImpl.decodeRow(iter.getTopKey(), iter.getTopValue()));
+
+    // simulate something continuing using the last key from the iterator
+    // this is what client and server code will do
+    range = new Range(iter.getTopKey(), false, range.getEndKey(), range.isEndKeyInclusive());
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+
+    assertTrue(iter.hasTop());
+    assertEquals(map2, RowEncodingIteratorImpl.decodeRow(iter.getTopKey(), iter.getTopValue()));
+
+    iter.next();
+
+    assertFalse(iter.hasTop());
+  }
+
+  @Test(expected = BufferOverflowException.class)
+  public void testEncodeSome() throws IOException {
+    byte[] kbVal = new byte[1024];
+    // This code is shamelessly borrowed from the WholeRowIteratorTest.
+    SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+    pkv(map1, "row1", "cf1", "cq1", "cv1", 5, kbVal);
+    pkv(map1, "row1", "cf1", "cq2", "cv1", 6, kbVal);
+
+    SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+    map.putAll(map1);
+    SortedMapIterator src = new SortedMapIterator(map);
+    Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+    RowEncodingIteratorImpl iter = new RowEncodingIteratorImpl();
+    Map<String,String> bigBufferOpts = new HashMap<String,String>();
+    bigBufferOpts.put(RowEncodingIterator.MAX_BUFFER_SIZE_OPT, "1K");
+    iter.init(src, bigBufferOpts, new DummyIteratorEnv());
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+    // BufferOverflowException should be thrown as RowEncodingIterator can't fit the whole
row into its buffer.
+  }
+}


Mime
View raw message