hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ashishsin...@apache.org
Subject hbase git commit: HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in the WAL
Date Fri, 01 Apr 2016 10:26:12 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 d7d12aedd -> bcbef7b40


HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in
the WAL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bcbef7b4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bcbef7b4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bcbef7b4

Branch: refs/heads/branch-1
Commit: bcbef7b401e211ad7cfcdcd176abafe7f30dbbe8
Parents: d7d12ae
Author: Ashish Singhi <ashishsinghi@apache.org>
Authored: Fri Apr 1 15:55:08 2016 +0530
Committer: Ashish Singhi <ashishsinghi@apache.org>
Committed: Fri Apr 1 15:55:08 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  4 +--
 .../hbase/regionserver/wal/MetricsWAL.java      |  7 +++-
 .../regionserver/wal/WALActionsListener.java    | 10 ++++--
 .../replication/regionserver/Replication.java   | 35 +++++++++++++++++---
 .../hadoop/hbase/wal/DisabledWALProvider.java   |  4 +--
 .../hbase/regionserver/wal/TestMetricsWAL.java  |  4 +--
 .../hbase/wal/WALPerformanceEvaluation.java     |  3 +-
 7 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index eb1cf57..c01cc1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1399,14 +1399,14 @@ public class FSHLog implements WAL {
     }
   }
 
-  private long postAppend(final Entry e, final long elapsedTime) {
+  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
     long len = 0;
     if (!listeners.isEmpty()) {
       for (Cell cell : e.getEdit().getCells()) {
         len += CellUtil.estimatedSerializedSizeOf(cell);
       }
       for (WALActionsListener listener : listeners) {
-        listener.postAppend(len, elapsedTime);
+        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
       }
     }
     return len;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
index 99792e5..69a31cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
@@ -20,9 +20,13 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.util.StringUtils;
 
@@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base {
   }
 
   @Override
-  public void postAppend(final long size, final long time) {
+  public void postAppend(final long size, final long time, final WALKey logkey,
+      final WALEdit logEdit) throws IOException {
     source.incrementAppendCount();
     source.incrementAppendTime(time);
     source.incrementAppendSize(size);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index db98083..60ab7b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -101,8 +101,12 @@ public interface WALActionsListener {
    * TODO: Combine this with above.
    * @param entryLen approx length of cells in this append.
    * @param elapsedTimeMillis elapsed time in milliseconds.
+   * @param logKey A WAL key
+   * @param logEdit A WAL edit containing list of cells.
+   * @throws IOException if any network or I/O occurred
    */
-  void postAppend(final long entryLen, final long elapsedTimeMillis);
+  void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey,
+      final WALEdit logEdit) throws IOException;
 
   /**
    * For notification post writer sync.  Used by metrics system at least.
@@ -140,7 +144,9 @@ public interface WALActionsListener {
     }
 
     @Override
-    public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
+    public void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey
logKey,
+        final WALEdit logEdit) throws IOException {
+    }
 
     @Override
     public void postSync(final long timeInNanos, final int handlerSyncs) {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 7110273..06138a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -262,6 +262,36 @@ public class Replication extends WALActionsListener.Base implements
     scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
   }
 
+  @Override
+  public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
+      final WALEdit edit) throws IOException {
+    NavigableMap<byte[], Integer> scopes = logKey.getScopes();
+    if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty())
{
+      TableName tableName = logKey.getTablename();
+      for (Cell c : edit.getCells()) {
+        // Only check for bulk load events
+        if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
+          BulkLoadDescriptor bld = null;
+          try {
+            bld = WALEdit.getBulkLoadDescriptor(c);
+          } catch (IOException e) {
+            LOG.error("Failed to get bulk load events information from the wal file.", e);
+            throw e;
+          }
+
+          for (StoreDescriptor s : bld.getStoresList()) {
+            byte[] fam = s.getFamilyName().toByteArray();
+            // We have already scoped the entries as part
+            // WALActionsListener#visitLogEntryBeforeWrite notification
+            if (scopes.containsKey(fam)) {
+              addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on
keys from
    * compaction WAL edits and if the scope is local.
@@ -314,10 +344,7 @@ public class Replication extends WALActionsListener.Base implements
           int scope = htd.getFamily(family).getScope();
           if (scope != REPLICATION_SCOPE_LOCAL) {
             scopes.put(family, scope);
-            addHFileRefsToQueue(replicationManager, tableName, family, s);
           }
-        } else {
-          addHFileRefsToQueue(replicationManager, tableName, family, s);
         }
       }
     } catch (IOException e) {
@@ -331,7 +358,7 @@ public class Replication extends WALActionsListener.Base implements
     try {
       replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
     } catch (ReplicationException e) {
-      LOG.error("Failed to create hfile references in ZK.", e);
+      LOG.error("Failed to add hfile references in the replication queue.", e);
       throw new IOException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 43738be..1816238 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -155,7 +155,7 @@ class DisabledWALProvider implements WALProvider {
 
     @Override
     public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-                       boolean inMemstore) {
+        boolean inMemstore) throws IOException {
       if (!this.listeners.isEmpty()) {
         final long start = System.nanoTime();
         long len = 0;
@@ -164,7 +164,7 @@ class DisabledWALProvider implements WALProvider {
         }
         final long elapsed = (System.nanoTime() - start)/1000000l;
         for (WALActionsListener listener : this.listeners) {
-          listener.postAppend(len, elapsed);
+          listener.postAppend(len, elapsed, key, edits);
         }
       }
       return -1;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
index 7be3e66..9a7d494 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
@@ -57,8 +57,8 @@ public class TestMetricsWAL {
   public void testWalWrittenInBytes() throws Exception {
     MetricsWALSource source = mock(MetricsWALSourceImpl.class);
     MetricsWAL metricsWAL = new MetricsWAL(source);
-    metricsWAL.postAppend(100, 900);
-    metricsWAL.postAppend(200, 2000);
+    metricsWAL.postAppend(100, 900, null, null);
+    metricsWAL.postAppend(200, 2000, null, null);
     verify(source, times(1)).incrementWrittenBytes(100);
     verify(source, times(1)).incrementWrittenBytes(200);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bcbef7b4/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index 3af853b..8af1882 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -514,7 +514,8 @@ public final class WALPerformanceEvaluation extends Configured implements
Tool {
         }
 
         @Override
-        public void postAppend(final long size, final long elapsedTime) {
+        public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
+            final WALEdit logEdit) {
           appendMeter.mark(size);
         }
       });


Mime
View raw message