accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/9] git commit: ACCUMULO-2825 Add RowEncodingIterator
Date Wed, 21 May 2014 03:01:25 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6.1-SNAPSHOT 244c1ab71 -> 1193f4b84
  refs/heads/master b510b766b -> f131ac6e7


ACCUMULO-2825 Add RowEncodingIterator

The WholeRowIterator now extends the abstract RowEncodingIterator


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/73a9e6cf
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/73a9e6cf
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/73a9e6cf

Branch: refs/heads/1.6.1-SNAPSHOT
Commit: 73a9e6cfcad6c836ed9eb95421bc7306037739a9
Parents: 244c1ab
Author: Ryan Leary <rleary@bbn.com>
Authored: Sat May 17 21:03:03 2014 -0400
Committer: Ryan Leary <rleary@bbn.com>
Committed: Sat May 17 21:10:52 2014 -0400

----------------------------------------------------------------------
 .../iterators/user/RowEncodingIterator.java     | 172 +++++++++++++++++++
 .../core/iterators/user/WholeRowIterator.java   | 130 ++------------
 2 files changed, 191 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/73a9e6cf/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
new file mode 100644
index 0000000..dff1e04
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ * The WholeRowIterator is designed to provide row-isolation so that queries see mutations
as atomic. It does so by encapsulating an entire row of key/value
+ * pairs into a single key/value pair, which is returned through the client as an atomic
operation.
+ * 
+ * <p>
+ * One caveat is that when seeking in the WholeRowIterator using a range that starts at a
non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
+ * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in
order to prevent repeated scanning of the same row when system
+ * automatically creates ranges of that form, which happens in the case of the client calling
continueScan, or in the case of the tablet server continuing a
+ * scan after swapping out sources.
+ * 
+ * <p>
+ * To regain the original key/value pairs of the row, call the decodeRow function on the
key/value pair that this iterator returned.
+ * 
+ * @see RowFilter
+ */
+public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value>
{
+  
+  protected SortedKeyValueIterator<Key,Value> sourceIter;
+  private Key topKey = null;
+  private Value topValue = null;
+
+  // decode a bunch of key value pairs that have been encoded into a single value
+  /**
+   * Given a value generated by the rowEncoder implementation, recreate the
+   * original Key, Value pairs. 
+   * @param rowKey
+   * @param rowValue
+   * @return
+   * @throws IOException
+   */
+  public abstract SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws
IOException;
+  
+  /**
+   * Take a stream of keys and values. Return values in the same order
+   * encoded such that all portions of the key (except for the row value)
+   * and the original value are encoded in some way.
+   * @param keys
+   * @param values
+   * @return
+   * @throws IOException
+   */
+  public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws
IOException;
+  
+  /**
+   * Implement deepCopy. Ensure sourceIter is copied appropriately.
+   */
+  @Override
+  public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env);
+  
+  List<Key> keys = new ArrayList<Key>();
+  List<Value> values = new ArrayList<Value>();
+  
+  private void prepKeys() throws IOException {
+    if (topKey != null)
+      return;
+    Text currentRow;
+    do {
+      if (sourceIter.hasTop() == false)
+        return;
+      currentRow = new Text(sourceIter.getTopKey().getRow());
+      keys.clear();
+      values.clear();
+      while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow))
{
+        keys.add(new Key(sourceIter.getTopKey()));
+        values.add(new Value(sourceIter.getTopValue()));
+        sourceIter.next();
+      }
+    } while (!filter(currentRow, keys, values));
+    
+    topKey = new Key(currentRow);
+    topValue = rowEncoder(keys, values);
+  }
+  
+  /**
+   * 
+   * @param currentRow
+   *          All keys have this in their row portion (do not modify!).
+   * @param keys
+   *          One key for each key in the row, ordered as they are given by the source iterator
(do not modify!).
+   * @param values
+   *          One value for each key in keys, ordered to correspond to the ordering in keys
(do not modify!).
+   * @return true if we want to keep the row, false if we want to skip it
+   */
+  protected boolean filter(Text currentRow, List<Key> keys, List<Value> values)
{
+    return true;
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return topKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return topValue;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return topKey != null;
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
+    sourceIter = source;
+  }
+  
+  @Override
+  public void next() throws IOException {
+    topKey = null;
+    topValue = null;
+    prepKeys();
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
+    topKey = null;
+    topValue = null;
+    
+    Key sk = range.getStartKey();
+    
+    if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length()
== 0 && sk.getColumnVisibilityData().length() == 0
+        && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive())
{
+      // assuming that we are seeking using a key previously returned by this iterator
+      // therefore go to the next row
+      Key followingRowKey = sk.followingKey(PartialKey.ROW);
+      if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey())
> 0)
+        return;
+      
+      range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
+    }
+    
+    sourceIter.seek(range, columnFamilies, inclusive);
+    prepKeys();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/73a9e6cf/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
index 525f27c..c8bceea 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java
@@ -21,21 +21,15 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.hadoop.io.Text;
 
 /**
  * 
@@ -53,19 +47,29 @@ import org.apache.hadoop.io.Text;
  * 
  * @see RowFilter
  */
-public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> {
-  
-  private SortedKeyValueIterator<Key,Value> sourceIter;
-  private Key topKey = null;
-  private Value topValue = null;
-  
-  public WholeRowIterator() {
-    
-  }
+public class WholeRowIterator extends RowEncodingIterator {
+  public WholeRowIterator() {}
   
   WholeRowIterator(SortedKeyValueIterator<Key,Value> source) {
     this.sourceIter = source;
   }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    if (sourceIter != null)
+      return new WholeRowIterator(sourceIter.deepCopy(env));
+    return new WholeRowIterator();
+  }
+
+  @Override
+  public SortedMap<Key, Value> rowDecoder(Key rowKey, Value rowValue) throws IOException
{
+	return decodeRow(rowKey, rowValue);
+  }
+
+  @Override
+  public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException
{
+	return encodeRow(keys, values);
+  }
 
   /**
    * Returns the byte array containing the field of row key from the given DataInputStream
din.
@@ -139,100 +143,4 @@ public class WholeRowIterator implements SortedKeyValueIterator<Key,Value>
{
     
     return new Value(out.toByteArray());
   }
-  
-  List<Key> keys = new ArrayList<Key>();
-  List<Value> values = new ArrayList<Value>();
-  
-  private void prepKeys() throws IOException {
-    if (topKey != null)
-      return;
-    Text currentRow;
-    do {
-      if (sourceIter.hasTop() == false)
-        return;
-      currentRow = new Text(sourceIter.getTopKey().getRow());
-      keys.clear();
-      values.clear();
-      while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow))
{
-        keys.add(new Key(sourceIter.getTopKey()));
-        values.add(new Value(sourceIter.getTopValue()));
-        sourceIter.next();
-      }
-    } while (!filter(currentRow, keys, values));
-    
-    topKey = new Key(currentRow);
-    topValue = encodeRow(keys, values);
-    
-  }
-  
-  /**
-   * 
-   * @param currentRow
-   *          All keys have this in their row portion (do not modify!).
-   * @param keys
-   *          One key for each key in the row, ordered as they are given by the source iterator
(do not modify!).
-   * @param values
-   *          One value for each key in keys, ordered to correspond to the ordering in keys
(do not modify!).
-   * @return true if we want to keep the row, false if we want to skip it
-   */
-  protected boolean filter(Text currentRow, List<Key> keys, List<Value> values)
{
-    return true;
-  }
-  
-  @Override
-  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    if (sourceIter != null)
-      return new WholeRowIterator(sourceIter.deepCopy(env));
-    return new WholeRowIterator();
-  }
-  
-  @Override
-  public Key getTopKey() {
-    return topKey;
-  }
-  
-  @Override
-  public Value getTopValue() {
-    return topValue;
-  }
-  
-  @Override
-  public boolean hasTop() {
-    return topKey != null;
-  }
-  
-  @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
-    sourceIter = source;
-  }
-  
-  @Override
-  public void next() throws IOException {
-    topKey = null;
-    topValue = null;
-    prepKeys();
-  }
-  
-  @Override
-  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
-    topKey = null;
-    topValue = null;
-    
-    Key sk = range.getStartKey();
-    
-    if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length()
== 0 && sk.getColumnVisibilityData().length() == 0
-        && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive())
{
-      // assuming that we are seeking using a key previously returned by this iterator
-      // therefore go to the next row
-      Key followingRowKey = sk.followingKey(PartialKey.ROW);
-      if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey())
> 0)
-        return;
-      
-      range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
-    }
-    
-    sourceIter.seek(range, columnFamilies, inclusive);
-    prepKeys();
-  }
-  
 }


Mime
View raw message