fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [2/3] incubator-fluo git commit: fixes #743 made data classes final
Date Fri, 29 Jul 2016 19:38:21 GMT
fixes #743 made data classes final


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/2805e2fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/2805e2fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/2805e2fa

Branch: refs/heads/master
Commit: 2805e2fadbcc1dedfcc9c422e642d39fefe3cdfc
Parents: 051a33b
Author: Keith Turner <keith@deenlo.com>
Authored: Thu Jul 28 17:32:55 2016 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Jul 29 15:31:51 2016 -0400

----------------------------------------------------------------------
 .../apache/fluo/accumulo/data/MutableBytes.java | 135 -------------------
 .../fluo/accumulo/data/MutableBytesFactory.java |  30 -----
 .../java/org/apache/fluo/api/data/Bytes.java    | 109 +++++++++------
 .../java/org/apache/fluo/api/data/Column.java   |   2 +-
 .../org/apache/fluo/api/data/ColumnValue.java   |   2 +-
 .../org/apache/fluo/api/data/RowColumn.java     |   2 +-
 .../apache/fluo/api/data/RowColumnValue.java    |  77 +++++++----
 .../java/org/apache/fluo/api/data/Span.java     |   2 +-
 .../org/apache/fluo/core/impl/Notification.java |  28 ++--
 .../apache/fluo/core/impl/TransactionImpl.java  |   6 +-
 .../java/org/apache/fluo/core/util/Hex.java     |   2 +-
 .../fluo/core/worker/NotificationProcessor.java |  10 +-
 .../finder/hash/HashNotificationFinder.java     |   6 +-
 .../apache/fluo/core/data/MutableBytesTest.java |  52 -------
 .../fluo/core/data/RowColumnValueTest.java      |   7 -
 .../fluo/mapreduce/FluoEntryInputFormat.java    |   2 +-
 16 files changed, 158 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java
deleted file mode 100644
index 898382f..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.fluo.accumulo.data;
-
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * An implementation of {@link Bytes} that is mutable and uses a backing byte array
- */
-public class MutableBytes extends Bytes implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-
-  private final byte[] data;
-  private final int offset;
-  private final int length;
-
-  public MutableBytes() {
-    this.data = null;
-    this.offset = 0;
-    this.length = 0;
-  }
-
-  /**
-   * Creates a new MutableBytes. The given byte array is used directly as the backing array
so later
-   * changes made to the array reflect into the new sequence.
-   */
-  public MutableBytes(byte[] data) {
-    this.data = data;
-    this.offset = 0;
-    this.length = data.length;
-  }
-
-  /**
-   * Creates a new MutableBytes from a subsequence of the given byte array. The given byte
array is
-   * used directly as the backing array, so later changes made to the (relevant portion of
the)
-   * array reflect into the new sequence.
-   *
-   * @param data byte data
-   * @param offset starting offset in byte array (inclusive)
-   * @param length number of bytes to include in sequence
-   * @throws IllegalArgumentException if the offset or length are out of bounds for the given
byte
-   *         array
-   */
-  public MutableBytes(byte[] data, int offset, int length) {
-    if (offset < 0 || offset > data.length || length < 0 || (offset + length) >
data.length) {
-      throw new IllegalArgumentException(" Bad offset and/or length data.length = " + data.length
-          + " offset = " + offset + " length = " + length);
-    }
-    this.data = data;
-    this.offset = offset;
-    this.length = length;
-  }
-
-  /**
-   * Creates a new MutableBytes from the given string. The bytes are determined from the
string
-   * using UTF-8 encoding
-   *
-   * @param s String to represent as Bytes
-   */
-  public MutableBytes(String s) {
-    this(s.getBytes(StandardCharsets.UTF_8));
-  }
-
-  /**
-   * Creates a new MutableBytes from the given string. The bytes are determined from the
string
-   * using the specified charset
-   *
-   * @param s String to represent as Bytes
-   * @param cs Charset
-   */
-  public MutableBytes(String s, Charset cs) {
-    this(s.getBytes(cs));
-  }
-
-  @Override
-  public byte byteAt(int i) {
-
-    if (i < 0) {
-      throw new IllegalArgumentException("i < 0, " + i);
-    }
-
-    if (i >= length) {
-      throw new IllegalArgumentException("i >= length, " + i + " >= " + length);
-    }
-
-    return data[offset + i];
-  }
-
-  @Override
-  public int length() {
-    return length;
-  }
-
-  @Override
-  public Bytes subSequence(int start, int end) {
-    if (start > end || start < 0 || end > length) {
-      throw new IllegalArgumentException("Bad start and/end start = " + start + " end=" +
end
-          + " offset=" + offset + " length=" + length);
-    }
-    return new MutableBytes(data, offset + start, end - start);
-  }
-
-  @Override
-  public byte[] toArray() {
-    byte[] copy = new byte[length];
-    System.arraycopy(data, offset, copy, 0, length);
-    return copy;
-  }
-
-  /**
-   * Creates UTF-8 String using Bytes data
-   */
-  @Override
-  public String toString() {
-    return new String(data, offset, length, StandardCharsets.UTF_8);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java
deleted file mode 100644
index a8b4dab..0000000
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.fluo.accumulo.data;
-
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * An implementation of BytesFactory
- */
-public class MutableBytesFactory implements Bytes.BytesFactory {
-
-  @Override
-  public Bytes get(byte[] data) {
-    return new MutableBytes(data);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index b02e96a..3728243 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -37,19 +37,14 @@ import java.util.Objects;
  *
  * @since 1.0.0
  */
-public abstract class Bytes implements Comparable<Bytes>, Serializable {
+public final class Bytes implements Comparable<Bytes>, Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final String BYTES_FACTORY_CLASS =
-      "org.apache.fluo.accumulo.data.MutableBytesFactory";
   private static final String WRITE_UTIL_CLASS = "org.apache.fluo.accumulo.data.WriteUtilImpl";
 
-  /**
-   * @since 1.0.0
-   */
-  public interface BytesFactory {
-    Bytes get(byte[] data);
-  }
+  private final byte[] data;
+  private final int offset;
+  private final int length;
 
   /**
    * @since 1.0.0
@@ -60,13 +55,10 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
     int readVInt(DataInput stream) throws IOException;
   }
 
-  private static BytesFactory bytesFactory;
   private static WriteUtil writeUtil;
 
   static {
     try {
-      bytesFactory =
-          (BytesFactory) Class.forName(BYTES_FACTORY_CLASS).getDeclaredConstructor().newInstance();
       writeUtil =
           (WriteUtil) Class.forName(WRITE_UTIL_CLASS).getDeclaredConstructor().newInstance();
     } catch (Exception e) {
@@ -74,11 +66,25 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
     }
   }
 
-  public static final Bytes EMPTY = bytesFactory.get(new byte[0]);
+  public static final Bytes EMPTY = new Bytes(new byte[0]);
 
   private Integer hashCode = null;
 
-  public Bytes() {}
+  private Bytes(byte[] data) {
+    this.data = data;
+    this.offset = 0;
+    this.length = data.length;
+  }
+
+  private Bytes(byte[] data, int offset, int length) {
+    if (offset < 0 || offset > data.length || length < 0 || (offset + length) >
data.length) {
+      throw new IndexOutOfBoundsException(" Bad offset and/or length data.length = " + data.length
+          + " offset = " + offset + " length = " + length);
+    }
+    this.data = data;
+    this.offset = offset;
+    this.length = length;
+  }
 
   /**
    * Gets a byte within this sequence of bytes
@@ -87,12 +93,25 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
    * @return byte
    * @throws IllegalArgumentException if i is out of range
    */
-  public abstract byte byteAt(int i);
+  public byte byteAt(int i) {
+
+    if (i < 0) {
+      throw new IndexOutOfBoundsException("i < 0, " + i);
+    }
+
+    if (i >= length) {
+      throw new IndexOutOfBoundsException("i >= length, " + i + " >= " + length);
+    }
+
+    return data[offset + i];
+  }
 
   /**
    * Gets the length of bytes
    */
-  public abstract int length();
+  public int length() {
+    return length;
+  }
 
   /**
    * Returns a portion of the Bytes object
@@ -100,17 +119,35 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
    * @param start index of subsequence start (inclusive)
    * @param end index of subsequence end (exclusive)
    */
-  public abstract Bytes subSequence(int start, int end);
+  public Bytes subSequence(int start, int end) {
+    if (start > end || start < 0 || end > length) {
+      throw new IllegalArgumentException("Bad start and/end start = " + start + " end=" +
end
+          + " offset=" + offset + " length=" + length);
+    }
+    return new Bytes(data, offset + start, end - start);
+  }
 
   /**
    * Returns a byte array containing a copy of the bytes
    */
-  public abstract byte[] toArray();
+  public byte[] toArray() {
+    byte[] copy = new byte[length];
+    System.arraycopy(data, offset, copy, 0, length);
+    return copy;
+  }
 
   /**
-   * Compares the two given byte sequences, byte by byte, returning a negative, zero, or
positive
-   * result if the first sequence is less than, equal to, or greater than the second. The
comparison
-   * is performed starting with the first byte of each sequence, and proceeds until a pair
of bytes
+   * Creates UTF-8 String using Bytes data
+   */
+  @Override
+  public String toString() {
+    return new String(data, offset, length, StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Compares this to the given bytes, byte by byte, returning a negative, zero, or positive
result
+   * if the first sequence is less than, equal to, or greater than the second. The comparison
is
+   * performed starting with the first byte of each sequence, and proceeds until a pair of
bytes
    * differs, or one sequence runs out of byte (is shorter). A shorter sequence is considered
less
    * than a longer one.
    *
@@ -118,27 +155,19 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
    * @param b2 second byte sequence to compare
    * @return comparison result
    */
-  public static final int compareBytes(Bytes b1, Bytes b2) {
-
-    int minLen = Math.min(b1.length(), b2.length());
+  @Override
+  public final int compareTo(Bytes other) {
+    int minLen = Math.min(this.length(), other.length());
 
     for (int i = 0; i < minLen; i++) {
-      int a = (b1.byteAt(i) & 0xff);
-      int b = (b2.byteAt(i) & 0xff);
+      int a = (this.byteAt(i) & 0xff);
+      int b = (other.byteAt(i) & 0xff);
 
       if (a != b) {
         return a - b;
       }
     }
-    return b1.length() - b2.length();
-  }
-
-  /**
-   * Compares this Bytes object to another.
-   */
-  @Override
-  public final int compareTo(Bytes other) {
-    return compareBytes(this, other);
+    return this.length() - other.length();
   }
 
   /**
@@ -184,7 +213,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
     }
     byte[] copy = new byte[array.length];
     System.arraycopy(array, 0, copy, 0, array.length);
-    return bytesFactory.get(copy);
+    return new Bytes(copy);
   }
 
   /**
@@ -201,7 +230,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
     }
     byte[] copy = new byte[length];
     System.arraycopy(data, offset, copy, 0, length);
-    return bytesFactory.get(copy);
+    return new Bytes(copy);
   }
 
   /**
@@ -215,7 +244,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
     byte[] data = new byte[bb.remaining()];
     // duplicate so that it does not change position
     bb.duplicate().get(data);
-    return bytesFactory.get(data);
+    return new Bytes(data);
   }
 
   /**
@@ -227,7 +256,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
       return EMPTY;
     }
     byte[] data = s.getBytes(StandardCharsets.UTF_8);
-    return bytesFactory.get(data);
+    return new Bytes(data);
   }
 
   /**
@@ -240,7 +269,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable
{
       return EMPTY;
     }
     byte[] data = s.getBytes(c);
-    return bytesFactory.get(data);
+    return new Bytes(data);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Column.java b/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
index 8d17a93..8876249 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Column.java
@@ -25,7 +25,7 @@ import java.util.Objects;
  *
  * @since 1.0.0
  */
-public class Column implements Comparable<Column>, Serializable {
+public final class Column implements Comparable<Column>, Serializable {
 
   private static final long serialVersionUID = 1L;
   public static final Bytes UNSET = Bytes.of(new byte[0]);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
index 4080e8c..dc65057 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
  * @since 1.0.0
  */
 
-public class ColumnValue implements Serializable, Comparable<ColumnValue> {
+public final class ColumnValue implements Serializable, Comparable<ColumnValue> {
   private static final long serialVersionUID = 1L;
 
   private Column column;

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
index 1fa43d7..48b18b3 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
@@ -24,7 +24,7 @@ import java.util.Objects;
  *
  * @since 1.0.0
  */
-public class RowColumn implements Comparable<RowColumn>, Serializable {
+public final class RowColumn implements Comparable<RowColumn>, Serializable {
 
   private static final long serialVersionUID = 1L;
   public static RowColumn EMPTY = new RowColumn();

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
index de32948..62968be 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java
@@ -15,18 +15,24 @@
 
 package org.apache.fluo.api.data;
 
+import java.io.Serializable;
+import java.util.Objects;
+
 /**
  * An immutable object that can hold a row, column, and value.
  *
  * @since 1.0.0
  */
-public class RowColumnValue extends RowColumn {
+public final class RowColumnValue implements Comparable<RowColumnValue>, Serializable
{
   private static final long serialVersionUID = 1L;
 
+  private Bytes row = Bytes.EMPTY;
+  private Column col = Column.EMPTY;
   private Bytes val = Bytes.EMPTY;
 
   public RowColumnValue(Bytes row, Column col, Bytes val) {
-    super(row, col);
+    this.row = row;
+    this.col = col;
     this.val = val;
   }
 
@@ -35,10 +41,38 @@ public class RowColumnValue extends RowColumn {
    * @param val (will be UTF-8 encoded)
    */
   public RowColumnValue(String row, Column col, String val) {
-    super(Bytes.of(row), col);
+    this.row = Bytes.of(row);
+    this.col = col;
     this.val = Bytes.of(val);
   }
 
+  /**
+   * Retrieves Row in RowColumn
+   *
+   * @return Row
+   */
+  public Bytes getRow() {
+    return row;
+  }
+
+  /**
+   * Retrieves Row in RowColumn as a String using UTF-8 encoding.
+   *
+   * @return Row
+   */
+  public String getsRow() {
+    return row.toString();
+  }
+
+  /**
+   * Retrieves Column in RowColumn
+   *
+   * @return Column
+   */
+  public Column getColumn() {
+    return col;
+  }
+
   public Bytes getValue() {
     return val;
   }
@@ -47,9 +81,13 @@ public class RowColumnValue extends RowColumn {
     return val.toString();
   }
 
+  public RowColumn getRowColumn() {
+    return new RowColumn(row, col);
+  }
+
   @Override
   public int hashCode() {
-    return super.hashCode() + 31 * val.hashCode();
+    return Objects.hash(row, col, val);
   }
 
   @Override
@@ -60,34 +98,25 @@ public class RowColumnValue extends RowColumn {
 
     if (o instanceof RowColumnValue) {
       RowColumnValue orcv = (RowColumnValue) o;
-
-      if (super.equals(orcv)) {
-        return val.equals(orcv.val);
-      }
+      return row.equals(orcv.row) && col.equals(orcv.col) && val.equals(orcv.val);
     }
     return false;
   }
 
   @Override
-  public int compareTo(RowColumn orc) {
-    if (orc == this) {
-      return 0;
-    }
-
-    if (!(orc instanceof RowColumnValue)) {
-      throw new IllegalArgumentException("Can only compare to same type");
-    }
+  public String toString() {
+    return getRowColumn() + " " + val;
+  }
 
-    int result = super.compareTo(orc);
+  @Override
+  public int compareTo(RowColumnValue o) {
+    int result = row.compareTo(o.row);
     if (result == 0) {
-      RowColumnValue orcv = (RowColumnValue) orc;
-      result = val.compareTo(orcv.val);
+      result = col.compareTo(o.col);
+      if (result == 0) {
+        result = val.compareTo(o.val);
+      }
     }
     return result;
   }
-
-  @Override
-  public String toString() {
-    return super.toString() + " " + val;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Span.java b/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
index 6ac084b..a5a9f94 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Span.java
@@ -24,7 +24,7 @@ import java.util.Objects;
  *
  * @since 1.0.0
  */
-public class Span implements Serializable {
+public final class Span implements Serializable {
 
   private static final long serialVersionUID = 1L;
   private RowColumn start = RowColumn.EMPTY;

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
index 4db011d..17c3e00 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java
@@ -39,13 +39,13 @@ import static org.apache.fluo.accumulo.util.NotificationUtil.isDelete;
  * See {@link NotificationIterator} for explanation of notification timestamp serialization.
  *
  */
-public class Notification extends RowColumn {
-  private static final long serialVersionUID = 1L;
+public class Notification {
 
-  private long timestamp;
+  private final RowColumn rowCol;
+  private final long timestamp;
 
   public Notification(Bytes row, Column col, long ts) {
-    super(row, col);
+    rowCol = new RowColumn(row, col);
     this.timestamp = ts;
   }
 
@@ -53,15 +53,27 @@ public class Notification extends RowColumn {
     return timestamp;
   }
 
+  public Bytes getRow() {
+    return rowCol.getRow();
+  }
+
+  public Column getColumn() {
+    return rowCol.getColumn();
+  }
+
+  public RowColumn getRowColumn() {
+    return rowCol;
+  }
+
   public Flutation newDelete(Environment env) {
     return newDelete(env, getTimestamp());
   }
 
   public Flutation newDelete(Environment env, long ts) {
-    Flutation m = new Flutation(env, getRow());
-    ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(getColumn());
-    m.put(ColumnConstants.NOTIFY_CF.toArray(), encodeCol(getColumn()), cv, encodeTs(ts, true),
-        TransactionImpl.EMPTY);
+    Flutation m = new Flutation(env, rowCol.getRow());
+    ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(rowCol.getColumn());
+    m.put(ColumnConstants.NOTIFY_CF.toArray(), encodeCol(rowCol.getColumn()), cv,
+        encodeTs(ts, true), TransactionImpl.EMPTY);
     return m;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index ca84807..f25099b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -510,11 +510,11 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
 
         if (notification.getColumn().equals(col)) {
           // check to see if ACK exist after notification
-          Key startKey = SpanUtil.toKey(notification);
+          Key startKey = SpanUtil.toKey(notification.getRowColumn());
           startKey.setTimestamp(ColumnConstants.ACK_PREFIX
               | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK));
 
-          Key endKey = SpanUtil.toKey(notification);
+          Key endKey = SpanUtil.toKey(notification.getRowColumn());
           endKey.setTimestamp(ColumnConstants.ACK_PREFIX | (notification.getTimestamp() +
1));
 
           Range range = new Range(startKey, endKey);
@@ -786,7 +786,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot {
     if (primary != null) {
       primRow = primary.getRow();
       primCol = primary.getColumn();
-      if (notification != null && !primary.equals(notification)) {
+      if (notification != null && !primary.equals(notification.getRowColumn())) {
         throw new IllegalArgumentException("Primary must be notification");
       }
     } else if (notification != null) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
index f08a4ea..f9fc85f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
@@ -73,7 +73,7 @@ public class Hex {
 
   public static String encNonAscii(Notification n) {
     StringBuilder sb = new StringBuilder();
-    encNonAscii(sb, n, " ");
+    encNonAscii(sb, n.getRowColumn(), " ");
     sb.append(" ");
     sb.append(n.getTimestamp());
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index e39bfbf..1834835 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -182,14 +182,14 @@ public class NotificationProcessor implements AutoCloseable {
         new WorkTaskAsync(this, notificationFinder, env, notification, observers);
     FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder,
workTask);
 
-    if (!tracker.add(notification, ft)) {
+    if (!tracker.add(notification.getRowColumn(), ft)) {
       return false;
     }
 
     try {
       executor.execute(ft);
     } catch (RejectedExecutionException rje) {
-      tracker.remove(notification);
+      tracker.remove(notification.getRowColumn());
       throw rje;
     }
 
@@ -203,18 +203,18 @@ public class NotificationProcessor implements AutoCloseable {
         new WorkTaskAsync(this, notificationFinder, env, notification, observers);
     FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder,
workTask);
 
-    if (tracker.requeue(notification, ft)) {
+    if (tracker.requeue(notification.getRowColumn(), ft)) {
       try {
         executor.execute(ft);
       } catch (RejectedExecutionException rje) {
-        tracker.remove(notification);
+        tracker.remove(notification.getRowColumn());
         throw rje;
       }
     }
   }
 
   public void notificationProcessed(final Notification notification) {
-    tracker.remove(notification);
+    tracker.remove(notification.getRowColumn());
   }
 
   public int size() {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
index b71954b..e18ec2f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
@@ -34,7 +34,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.fluo.accumulo.data.MutableBytes;
 import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
 import org.apache.fluo.accumulo.util.NotificationUtil;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -193,9 +192,8 @@ public class HashNotificationFinder implements NotificationFinder {
   @VisibleForTesting
   static boolean shouldProcess(Notification notification, int divisor, int remainder) {
     byte[] cfcq = NotificationUtil.encodeCol(notification.getColumn());
-    return NotificationHashFilter.accept(
-        ByteUtil.toByteSequence((MutableBytes) notification.getRow()), new ArrayByteSequence(cfcq),
-        divisor, remainder);
+    return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()),
+        new ArrayByteSequence(cfcq), divisor, remainder);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java b/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java
deleted file mode 100644
index c6f4a1d..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.fluo.core.data;
-
-import org.apache.fluo.accumulo.data.MutableBytes;
-import org.apache.fluo.api.data.Bytes;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit test for {@link MutableBytes}
- */
-public class MutableBytesTest {
-
-  @Test
-  public void testImmutableBytes() {
-    byte[] d1 = Bytes.of("mydata").toArray();
-
-    MutableBytes mutable = new MutableBytes(d1);
-    Assert.assertNotSame(d1, mutable.toArray());
-
-    Bytes immutable = Bytes.of(d1);
-    Assert.assertNotSame(d1, immutable.toArray());
-    Assert.assertEquals(mutable, immutable);
-    Assert.assertNotSame(mutable, immutable);
-
-    Bytes read = mutable;
-    Assert.assertEquals(read, immutable);
-    Assert.assertSame(read, mutable);
-    Assert.assertEquals(read, mutable);
-    Assert.assertNotSame(d1, read.toArray());
-
-    MutableBytes write = (MutableBytes) immutable;
-    Assert.assertEquals(write, mutable);
-    Assert.assertNotSame(write, mutable);
-    byte[] d2 = write.toArray();
-    Assert.assertNotSame(d2, write.toArray());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java
b/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java
index f7a47d4..63348b1 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.junit.Assert;
 import org.junit.Test;
@@ -85,10 +84,4 @@ public class RowColumnValueTest {
   public void testToString() {
     Assert.assertEquals("row1 fam1 qual1  a", rcv1.toString());
   }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testCompareRowColumn() {
-    rcv1.compareTo(new RowColumn("foo"));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
----------------------------------------------------------------------
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index 18c091c..d8ea54b 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -76,7 +76,7 @@ public class FluoEntryInputFormat extends InputFormat<RowColumn, Bytes>
{
 
       @Override
       public RowColumn getCurrentKey() throws IOException, InterruptedException {
-        return rowColVal;
+        return rowColVal.getRowColumn();
       }
 
       @Override


Mime
View raw message