hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-15093 Replication can report incorrect size of log queue for the global source when multiwal is enabled (Ashu Pachauri)
Date Mon, 11 Apr 2016 15:17:29 GMT
Repository: hbase
Updated Branches:
  refs/heads/master a395922ad -> 8541fe4ad


HBASE-15093 Replication can report incorrect size of log queue for the global source when
multiwal is enabled (Ashu Pachauri)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8541fe4a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8541fe4a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8541fe4a

Branch: refs/heads/master
Commit: 8541fe4ad10737efbec3734d3ba4d835c51afa7d
Parents: a395922
Author: tedyu <yuzhihong@gmail.com>
Authored: Mon Apr 11 08:17:20 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Mon Apr 11 08:17:20 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java           |  2 +-
 .../MetricsReplicationGlobalSourceSource.java     |  9 +++++----
 .../MetricsReplicationSourceSourceImpl.java       |  9 +++++----
 .../replication/regionserver/MetricsSource.java   | 18 ++++++++++--------
 .../regionserver/ReplicationSource.java           | 14 ++++++--------
 5 files changed, 27 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8541fe4a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 188c3a3..3aa01ab 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -36,7 +36,6 @@ public interface MetricsReplicationSourceSource {
   public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
 
   void setLastShippedAge(long age);
-  void setSizeOfLogQueue(int size);
   void incrSizeOfLogQueue(int size);
   void decrSizeOfLogQueue(int size);
   void incrLogEditsFiltered(long size);
@@ -47,6 +46,7 @@ public interface MetricsReplicationSourceSource {
   void incrLogReadInEdits(long size);
   void clear();
   long getLastShippedAge();
+  int getSizeOfLogQueue();
   void incrHFilesShipped(long hfiles);
   void incrSizeOfHFileRefsQueue(long size);
   void decrSizeOfHFileRefsQueue(long size);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8541fe4a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 93b10b6..2526f32 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -64,10 +64,6 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
     ageOfLastShippedOpGauge.set(age);
   }
 
-  @Override public void setSizeOfLogQueue(int size) {
-    sizeOfLogQueueGauge.set(size);
-  }
-
   @Override public void incrSizeOfLogQueue(int size) {
     sizeOfLogQueueGauge.incr(size);
   }
@@ -121,4 +117,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public void decrSizeOfHFileRefsQueue(long size) {
     sizeOfHFileRefsQueueGauge.decr(size);
   }
+
+  @Override
+  public int getSizeOfLogQueue() {
+    return (int)sizeOfLogQueueGauge.value();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8541fe4a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 9941712..03e3116 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -85,10 +85,6 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     ageOfLastShippedOpGauge.set(age);
   }
 
-  @Override public void setSizeOfLogQueue(int size) {
-    sizeOfLogQueueGauge.set(size);
-  }
-
   @Override public void incrSizeOfLogQueue(int size) {
     sizeOfLogQueueGauge.incr(size);
   }
@@ -158,4 +154,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public void decrSizeOfHFileRefsQueue(long size) {
     sizeOfHFileRefsQueueGauge.decr(size);
   }
+
+  @Override
+  public int getSizeOfLogQueue() {
+    return (int)sizeOfLogQueueGauge.value();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8541fe4a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 9687af7..4a044bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -89,14 +89,16 @@ public class MetricsSource {
   }
 
   /**
-   * Set the size of the log queue
-   *
-   * @param size the size.
+   * Increment size of the log queue.
    */
-  public void setSizeOfLogQueue(int size) {
-    singleSourceSource.setSizeOfLogQueue(size);
-    globalSourceSource.incrSizeOfLogQueue(size - lastQueueSize);
-    lastQueueSize = size;
+  public void incrSizeOfLogQueue() {
+    singleSourceSource.incrSizeOfLogQueue(1);
+    globalSourceSource.incrSizeOfLogQueue(1);
+  }
+
+  public void decrSizeOfLogQueue() {
+    singleSourceSource.decrSizeOfLogQueue(1);
+    globalSourceSource.decrSizeOfLogQueue(1);
   }
 
   /**
@@ -186,7 +188,7 @@ public class MetricsSource {
    * @return sizeOfLogQueue
    */
   public int getSizeOfLogQueue() {
-    return this.lastQueueSize;
+    return singleSourceSource.getSizeOfLogQueue();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8541fe4a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 51ca7ed..7e58e41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -31,7 +31,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
@@ -138,7 +137,6 @@ public class ReplicationSource extends Thread
   private WALEntryFilter walEntryFilter;
   // throttler
   private ReplicationThrottler throttler;
-  private AtomicInteger logQueueSize = new AtomicInteger(0);
   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
 
@@ -221,10 +219,10 @@ public class ReplicationSource extends Thread
       }
     }
     queue.put(log);
-    int queueSize = logQueueSize.incrementAndGet();
-    this.metrics.setSizeOfLogQueue(queueSize);
+    this.metrics.incrSizeOfLogQueue();
     // This will log a warning for each new log that gets created above the warn threshold
-    if (queue.size() > this.logQueueWarnThreshold) {
+    int queueSize = queue.size();
+    if (queueSize > this.logQueueWarnThreshold) {
       LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
           + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
     }
@@ -510,7 +508,8 @@ public class ReplicationSource extends Thread
     private long currentNbHFiles = 0;
 
     public ReplicationSourceWorkerThread(String walGroupId,
-        PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSource source) {
+        PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
+        ReplicationSource source) {
       this.walGroupId = walGroupId;
       this.queue = queue;
       this.replicationQueueInfo = replicationQueueInfo;
@@ -769,8 +768,7 @@ public class ReplicationSource extends Thread
       try {
         if (this.currentPath == null) {
           this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
-          int queueSize = logQueueSize.decrementAndGet();
-          metrics.setSizeOfLogQueue(queueSize);
+          metrics.decrSizeOfLogQueue();
           if (this.currentPath != null) {
             // For recovered queue: must use peerClusterZnode since peerId is a parsed value
             manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,


Mime
View raw message