hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject git commit: HBASE-11620 Record the class name of Writer in WAL header so that only proper Reader can open the WAL file (Ted Yu)
Date Fri, 01 Aug 2014 17:15:41 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 85e317b3a -> e14296109


HBASE-11620 Record the class name of Writer in WAL header so that only proper Reader can open
the WAL file (Ted Yu)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1429610
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1429610
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1429610

Branch: refs/heads/branch-1
Commit: e142961099cda5b3f733cd2239cb22ce150f5c08
Parents: 85e317b
Author: Ted Yu <tedyu@apache.org>
Authored: Fri Aug 1 17:15:32 2014 +0000
Committer: Ted Yu <tedyu@apache.org>
Committed: Fri Aug 1 17:15:32 2014 +0000

----------------------------------------------------------------------
 .../hbase/protobuf/generated/WALProtos.java     | 226 ++++++++++++++++---
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../hbase/regionserver/wal/HLogFactory.java     |   4 +
 .../regionserver/wal/ProtobufLogReader.java     |  39 +++-
 .../regionserver/wal/ProtobufLogWriter.java     |   3 +
 .../wal/SecureProtobufLogReader.java            |  18 +-
 .../wal/SecureProtobufLogWriter.java            |   1 +
 .../wal/TestHLogReaderOnSecureHLog.java         | 187 +++++++++++++++
 8 files changed, 441 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 19f5690..3d0d1d0 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -122,6 +122,21 @@ public final class WALProtos {
      * <code>optional bool has_tag_compression = 3;</code>
      */
     boolean getHasTagCompression();
+
+    // optional string writer_cls_name = 4;
+    /**
+     * <code>optional string writer_cls_name = 4;</code>
+     */
+    boolean hasWriterClsName();
+    /**
+     * <code>optional string writer_cls_name = 4;</code>
+     */
+    java.lang.String getWriterClsName();
+    /**
+     * <code>optional string writer_cls_name = 4;</code>
+     */
+    com.google.protobuf.ByteString
+        getWriterClsNameBytes();
   }
   /**
    * Protobuf type {@code WALHeader}
@@ -189,6 +204,11 @@ public final class WALProtos {
               hasTagCompression_ = input.readBool();
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              writerClsName_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -277,10 +297,54 @@ public final class WALProtos {
       return hasTagCompression_;
     }
 
+    // optional string writer_cls_name = 4;
+    public static final int WRITER_CLS_NAME_FIELD_NUMBER = 4;
+    private java.lang.Object writerClsName_;
+    /**
+     * <code>optional string writer_cls_name = 4;</code>
+     */
+    public boolean hasWriterClsName() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional string writer_cls_name = 4;</code>
+     */
+    public java.lang.String getWriterClsName() {
+      java.lang.Object ref = writerClsName_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          writerClsName_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string writer_cls_name = 4;</code>
+     */
+    public com.google.protobuf.ByteString
+        getWriterClsNameBytes() {
+      java.lang.Object ref = writerClsName_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        writerClsName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       hasCompression_ = false;
       encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
       hasTagCompression_ = false;
+      writerClsName_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -303,6 +367,9 @@ public final class WALProtos {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBool(3, hasTagCompression_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, getWriterClsNameBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -324,6 +391,10 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(3, hasTagCompression_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getWriterClsNameBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -362,6 +433,11 @@ public final class WALProtos {
         result = result && (getHasTagCompression()
             == other.getHasTagCompression());
       }
+      result = result && (hasWriterClsName() == other.hasWriterClsName());
+      if (hasWriterClsName()) {
+        result = result && getWriterClsName()
+            .equals(other.getWriterClsName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -387,6 +463,10 @@ public final class WALProtos {
         hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getHasTagCompression());
       }
+      if (hasWriterClsName()) {
+        hash = (37 * hash) + WRITER_CLS_NAME_FIELD_NUMBER;
+        hash = (53 * hash) + getWriterClsName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -502,6 +582,8 @@ public final class WALProtos {
         bitField0_ = (bitField0_ & ~0x00000002);
         hasTagCompression_ = false;
         bitField0_ = (bitField0_ & ~0x00000004);
+        writerClsName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -542,6 +624,10 @@ public final class WALProtos {
           to_bitField0_ |= 0x00000004;
         }
         result.hasTagCompression_ = hasTagCompression_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.writerClsName_ = writerClsName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -567,6 +653,11 @@ public final class WALProtos {
         if (other.hasHasTagCompression()) {
           setHasTagCompression(other.getHasTagCompression());
         }
+        if (other.hasWriterClsName()) {
+          bitField0_ |= 0x00000008;
+          writerClsName_ = other.writerClsName_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -696,6 +787,80 @@ public final class WALProtos {
         return this;
       }
 
+      // optional string writer_cls_name = 4;
+      private java.lang.Object writerClsName_ = "";
+      /**
+       * <code>optional string writer_cls_name = 4;</code>
+       */
+      public boolean hasWriterClsName() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional string writer_cls_name = 4;</code>
+       */
+      public java.lang.String getWriterClsName() {
+        java.lang.Object ref = writerClsName_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          writerClsName_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string writer_cls_name = 4;</code>
+       */
+      public com.google.protobuf.ByteString
+          getWriterClsNameBytes() {
+        java.lang.Object ref = writerClsName_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          writerClsName_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string writer_cls_name = 4;</code>
+       */
+      public Builder setWriterClsName(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        writerClsName_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string writer_cls_name = 4;</code>
+       */
+      public Builder clearWriterClsName() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        writerClsName_ = getDefaultInstance().getWriterClsName();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string writer_cls_name = 4;</code>
+       */
+      public Builder setWriterClsNameBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        writerClsName_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:WALHeader)
     }
 
@@ -7664,36 +7829,37 @@ public final class WALProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
+      "\n\tWAL.proto\032\013HBase.proto\"r\n\tWALHeader\022\027\n" +
       "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
-      "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006"
+
-      "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
-      "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
-      " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005"
+
-      " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family"
+
-      "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
-      "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
-      "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
-      "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n"
+
-      "\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" +
-      "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
-      "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
-      " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
-      "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
-      "\t\022\023\n\013region_name\030\007 \001(\014\"\353\002\n\017FlushDescript" +
-      "or\022,\n\006action\030\001 \002(\0162\034.FlushDescriptor.Flu" +
-      "shAction\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_" +
-      "region_name\030\003 \002(\014\022\035\n\025flush_sequence_numb",
-      "er\030\004 \001(\004\022<\n\rstore_flushes\030\005 \003(\0132%.FlushD" +
-      "escriptor.StoreFlushDescriptor\032Y\n\024StoreF" +
-      "lushDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016s" +
-      "tore_home_dir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(" +
-      "\t\"A\n\013FlushAction\022\017\n\013START_FLUSH\020\000\022\020\n\014COM" +
-      "MIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"\014\n\nWALTrail" +
-      "er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" +
-      "AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" +
-      "g.apache.hadoop.hbase.protobuf.generated" +
-      "B\tWALProtosH\001\210\001\000\240\001\001"
+      "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017w"
+
+      "riter_cls_name\030\004 \001(\t\"\240\002\n\006WALKey\022\033\n\023encod" +
+      "ed_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014" +
+      "\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite_t" +
+      "ime\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001"
+
+      "\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022follow" +
+      "ing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132" +
+      "\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(",
+      "\004\022\034\n\024orig_sequence_number\030\013 \001(\004\"=\n\013Famil" +
+      "yScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002" +
+      "(\0162\n.ScopeType\"\276\001\n\024CompactionDescriptor\022" +
+      "\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded_region_na" +
+      "me\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compact"
+
+      "ion_input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003" +
+      "(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region_nam" +
+      "e\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006action\030\001 " +
+      "\002(\0162\034.FlushDescriptor.FlushAction\022\022\n\ntab" +
+      "le_name\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002",
+      "(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n\rsto" +
+      "re_flushes\030\005 \003(\0132%.FlushDescriptor.Store" +
+      "FlushDescriptor\032Y\n\024StoreFlushDescriptor\022" +
+      "\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002" +
+      " \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushActio" +
+      "n\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013"
+
+      "ABORT_FLUSH\020\002\"\014\n\nWALTrailer*F\n\tScopeType" +
+      "\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICA" +
+      "TION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop" +
+      ".hbase.protobuf.generatedB\tWALProtosH\001\210\001",
+      "\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7705,7 +7871,7 @@ public final class WALProtos {
           internal_static_WALHeader_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_WALHeader_descriptor,
-              new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression",
});
+              new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression",
"WriterClsName", });
           internal_static_WALKey_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_WALKey_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index f14d5f4..6c05c75 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -27,6 +27,7 @@ message WALHeader {
   optional bool has_compression = 1;
   optional bytes encryption_key = 2;
   optional bool has_tag_compression = 3;
+  optional string writer_cls_name = 4;
 }
 
 // Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit
header for some KVs

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
index 435a3e3..276e16c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
@@ -168,6 +168,10 @@ public class HLogFactory {
      */
     private static Class<? extends Writer> logWriterClass;
 
+    static void resetLogWriterClass() {
+      logWriterClass = null;
+    }
+
     /**
      * Create a writer for the WAL.
      * @return A WAL writer.  Close when done with it.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 11ef770..8f0f1c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -23,7 +23,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,7 +68,16 @@ public class ProtobufLogReader extends ReaderBase {
   // in the hlog, the inputstream's position is equal to walEditsStopOffset.
   private long walEditsStopOffset;
   private boolean trailerPresent;
+  private static List<String> writerClsNames = new ArrayList<String>();
+  static {
+    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
+  }
 
+  enum WALHdrResult {
+    EOF,                   // stream is at EOF when method starts
+    SUCCESS,
+    UNKNOWN_WRITER_CLS     // name of writer class isn't recognized
+  }
   public ProtobufLogReader() {
     super();
   }
@@ -95,11 +106,26 @@ public class ProtobufLogReader extends ReaderBase {
     initInternal(stream, true);
   }
 
-  protected boolean readHeader(Builder builder, FSDataInputStream stream) throws IOException
{
-    return builder.mergeDelimitedFrom(stream);
+  /*
+   * Returns names of the accepted writer classes
+   */
+  protected List<String> getWriterClsNames() {
+    return writerClsNames;
+  }
+
+  protected WALHdrResult readHeader(Builder builder, FSDataInputStream stream)
+      throws IOException {
+     boolean res = builder.mergeDelimitedFrom(stream);
+     if (!res) return WALHdrResult.EOF;
+     if (builder.hasWriterClsName() &&
+         !getWriterClsNames().contains(builder.getWriterClsName())) {
+       return WALHdrResult.UNKNOWN_WRITER_CLS;
+     }
+     return WALHdrResult.SUCCESS;
   }
 
-  private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException
{
+  private void initInternal(FSDataInputStream stream, boolean isFirst)
+      throws IOException {
     close();
     long expectedPos = PB_WAL_MAGIC.length;
     if (stream == null) {
@@ -111,10 +137,13 @@ public class ProtobufLogReader extends ReaderBase {
     }
     // Initialize metadata or, when we reset, just skip the header.
     WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
-    boolean hasHeader = readHeader(builder, stream);
-    if (!hasHeader) {
+    WALHdrResult walHdrRes = readHeader(builder, stream);
+    if (walHdrRes == WALHdrResult.EOF) {
       throw new EOFException("Couldn't read WAL PB header");
     }
+    if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
+      throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
+    }
     if (isFirst) {
       WALProtos.WALHeader header = builder.build();
       this.hasCompression = header.hasHasCompression() && header.getHasCompression();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 682b954..74552f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -59,6 +59,9 @@ public class ProtobufLogWriter extends WriterBase {
   }
 
   protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
+    if (!builder.hasWriterClsName()) {
+      builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
index 0c9de4b..7b025f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import java.io.IOException;
 import java.security.Key;
 import java.security.KeyException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,15 +40,25 @@ public class SecureProtobufLogReader extends ProtobufLogReader {
   private static final Log LOG = LogFactory.getLog(SecureProtobufLogReader.class);
 
   private Decryptor decryptor = null;
+  private static List<String> writerClsNames = new ArrayList<String>();
+  static {
+    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
+    writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
+  }
+
+  @Override
+  protected List<String> getWriterClsNames() {
+    return writerClsNames;
+  }
 
   @Override
-  protected boolean readHeader(WALHeader.Builder builder, FSDataInputStream stream)
+  protected WALHdrResult readHeader(WALHeader.Builder builder, FSDataInputStream stream)
       throws IOException {
-    boolean result = super.readHeader(builder, stream);
+    WALHdrResult result = super.readHeader(builder, stream);
     // We need to unconditionally handle the case where the WAL has a key in
     // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
     // no longer set in the site configuration.
-    if (result && builder.hasEncryptionKey()) {
+    if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) {
       // Serialized header data has been merged into the builder from the
       // stream.
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
index ee100fd..fa95388 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
@@ -44,6 +44,7 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter {
 
   @Override
   protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
+    builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
     if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
       // Get an instance of our cipher
       Cipher cipher = Encryption.getCipher(conf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1429610/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
new file mode 100644
index 0000000..02d59f9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.log4j.Level;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/*
+ * Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader
+ */
+@Category(MediumTests.class)
+public class TestHLogReaderOnSecureHLog {
+  static final Log LOG = LogFactory.getLog(TestHLogReaderOnSecureHLog.class);
+  static {
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
+      .getLogger().setLevel(Level.ALL);
+  };
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  final byte[] value = Bytes.toBytes("Test value");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
+    conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
+    conf.setBoolean("hbase.hlog.split.skip.errors", true);
+    conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
+  }
+
+  private Path writeWAL(String tblName) throws IOException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    TableName tableName = TableName.valueOf(tblName);
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(tableName.getName()));
+    HRegionInfo regioninfo = new HRegionInfo(tableName,
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+    final int total = 10;
+    final byte[] row = Bytes.toBytes("row");
+    final byte[] family = Bytes.toBytes("family");
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path logDir = TEST_UTIL.getDataTestDir(tblName);
+    final AtomicLong sequenceId = new AtomicLong(1);
+
+    // Write the WAL
+    FSHLog wal = new FSHLog(fs, TEST_UTIL.getDataTestDir(), logDir.toString(), conf);
+    for (int i = 0; i < total; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+      wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
+    }
+    final Path walPath = ((FSHLog) wal).computeFilename();
+    wal.close();
+    
+    return walPath;
+  }
+  
+  @Test()
+  public void testHLogReaderOnSecureHLog() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    HLogFactory.resetLogReaderClass();
+    HLogFactory.resetLogWriterClass();
+    conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+      HLog.Reader.class);
+    conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
+      HLog.Writer.class);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path walPath = writeWAL("testHLogReaderOnSecureHLog");
+
+    // Insure edits are not plaintext
+    long length = fs.getFileStatus(walPath).getLen();
+    FSDataInputStream in = fs.open(walPath);
+    byte[] fileData = new byte[(int)length];
+    IOUtils.readFully(in, fileData);
+    in.close();
+    assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
+
+    // Confirm the WAL cannot be read back by ProtobufLogReader
+    try {
+      HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath,
conf);
+      assertFalse(true);
+    } catch (IOException ioe) {
+      // expected IOE
+    }
+    
+    FileStatus[] listStatus = fs.listStatus(walPath.getParent());
+    RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?

+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+    Path rootdir = FSUtils.getRootDir(conf);
+    try {
+      HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
+      s.splitLogFile(listStatus[0], null);
+      Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
+        "corrupt");
+      assertTrue(fs.exists(file));
+      // assertFalse("log splitting should have failed", true);
+    } catch (IOException ioe) {
+      assertTrue("WAL should have been sidelined", false);
+    }
+  }
+  
+  @Test()
+  public void testSecureHLogReaderOnHLog() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    HLogFactory.resetLogReaderClass();
+    HLogFactory.resetLogWriterClass();
+    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
+      HLog.Reader.class);
+    conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
+      HLog.Writer.class);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path walPath = writeWAL("testSecureHLogReaderOnHLog");
+
+    // Ensure edits are plaintext
+    long length = fs.getFileStatus(walPath).getLen();
+    FSDataInputStream in = fs.open(walPath);
+    byte[] fileData = new byte[(int)length];
+    IOUtils.readFully(in, fileData);
+    in.close();
+    assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
+
+    // Confirm the WAL can be read back by SecureProtobufLogReader
+    try {
+      HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath,
conf);
+    } catch (IOException ioe) {
+      assertFalse(true);
+    }
+    
+    FileStatus[] listStatus = fs.listStatus(walPath.getParent());
+    RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?

+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+    Path rootdir = FSUtils.getRootDir(conf);
+    try {
+      HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
+      s.splitLogFile(listStatus[0], null);
+      Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
+        "corrupt");
+      assertTrue(!fs.exists(file));
+    } catch (IOException ioe) {
+      assertTrue("WAL should have been processed", false);
+    }
+  }
+}


Mime
View raw message