hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [4/9] hbase git commit: HBASE-9531 a command line (hbase shell) interface to retreive the replication metrics and show replication lag
Date Thu, 12 Feb 2015 22:49:45 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-protocol/src/main/protobuf/ClusterStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
index 2b2d9eb..bb531cc 100644
--- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
@@ -119,6 +119,19 @@ message RegionLoad {
 
 /* Server-level protobufs */
 
+message ReplicationLoadSink {
+  required uint64 ageOfLastAppliedOp = 1;
+  required uint64 timeStampsOfLastAppliedOp = 2;
+}
+
+message ReplicationLoadSource {
+  required string peerID = 1;
+  required uint64 ageOfLastShippedOp = 2;
+  required uint32 sizeOfLogQueue = 3;
+  required uint64 timeStampOfLastShippedOp = 4;
+  required uint64 replicationLag = 5;
+}
+
 message ServerLoad {
   /** Number of requests since last report. */
   optional uint32 number_of_requests = 1;
@@ -160,6 +173,16 @@ message ServerLoad {
    * The port number that this region server is hosing an info server on.
    */
   optional uint32 info_server_port = 9;
+
+  /**
+   * The replicationLoadSource for the replication Source status of this region server.
+   */
+  repeated ReplicationLoadSource replLoadSource = 10;
+
+  /**
+   * The replicationLoadSink for the replication Sink status of this region server.
+   */
+  optional ReplicationLoadSink replLoadSink = 11;
 }
 
 message LiveServerInfo {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f215d7e..ed5ff41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Addressing;
@@ -1130,6 +1131,22 @@ public class HRegionServer extends HasThread implements
     } else {
       serverLoad.setInfoServerPort(-1);
     }
+
+    // for the replicationLoad purpose. Only need to get from one service
+    // either source or sink will get the same info
+    ReplicationSourceService rsources = getReplicationSourceService();
+
+    if (rsources != null) {
+      // always refresh first to get the latest value
+      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
+      if (rLoad != null) {
+        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList())
{
+          serverLoad.addReplLoadSource(rLS);
+        }
+      }
+    }
+
     return serverLoad.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 92ac823..25a27a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -22,11 +22,12 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
- * Gateway to Cluster Replication.  
+ * Gateway to Cluster Replication.
  * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  * One such application is a cross-datacenter
  * replication service that can keep two hbase clusters in sync.
@@ -52,4 +53,9 @@ public interface ReplicationService {
    * Stops replication service.
    */
   void stopReplicationService();
+
+  /**
+   * Refresh and Get ReplicationLoad
+   */
+  public ReplicationLoad refreshAndGetReplicationLoad();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index 0c9d016..37dc1dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -71,4 +71,21 @@ public class MetricsSink {
     mss.incrAppliedOps(batchSize);
   }
 
+  /**
+   * Get the Age of Last Applied Op
+   * @return ageOfLastAppliedOp
+   */
+  public long getAgeOfLastAppliedOp() {
+    return mss.getLastAppliedOpAge();
+  }
+
+  /**
+   * Get the TimeStampOfLastAppliedOp. If no replication Op applied yet, the value is the
timestamp
+   * at which hbase instance starts
+   * @return timeStampsOfLastAppliedOp;
+   */
+  public long getTimeStampOfLastAppliedOp() {
+    return this.lastTimestampForAge;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index a734b9c..21296a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -36,6 +36,7 @@ public class MetricsSource {
 
   private long lastTimestamp = 0;
   private int lastQueueSize = 0;
+  private String id;
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
@@ -46,6 +47,7 @@ public class MetricsSource {
    * @param id Name of the source this class is monitoring
    */
   public MetricsSource(String id) {
+    this.id = id;
     singleSourceSource =
         CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
             .getSource(id);
@@ -143,4 +145,36 @@ public class MetricsSource {
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
     lastQueueSize = 0;
   }
+
+  /**
+   * Get AgeOfLastShippedOp
+   * @return AgeOfLastShippedOp
+   */
+  public Long getAgeOfLastShippedOp() {
+    return singleSourceSource.getLastShippedAge();
+  }
+
+  /**
+   * Get the sizeOfLogQueue
+   * @return sizeOfLogQueue
+   */
+  public int getSizeOfLogQueue() {
+    return this.lastQueueSize;
+  }
+
+  /**
+   * Get the timeStampsOfLastShippedOp
+   * @return lastTimestampForAge
+   */
+  public long getTimeStampOfLastShippedOp() {
+    return lastTimestamp;
+  }
+
+  /**
+   * Get the slave peer ID
+   * @return peerID
+   */
+  public String getPeerID() {
+    return id;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4729644..78bb92e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -66,7 +67,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  */
 @InterfaceAudience.Private
-public class Replication extends WALActionsListener.Base implements 
+public class Replication extends WALActionsListener.Base implements
   ReplicationSourceService, ReplicationSinkService {
   private static final Log LOG =
       LogFactory.getLog(Replication.class);
@@ -82,6 +83,8 @@ public class Replication extends WALActionsListener.Base implements
   /** Statistics thread schedule pool */
   private ScheduledExecutorService scheduleThreadPool;
   private int statsThreadPeriod;
+  // ReplicationLoad to access replication metrics
+  private ReplicationLoad replicationLoad;
 
   /**
    * Instantiate the replication management (if rep is enabled).
@@ -138,11 +141,13 @@ public class Replication extends WALActionsListener.Base implements
       this.statsThreadPeriod =
           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
+      this.replicationLoad = new ReplicationLoad();
     } else {
       this.replicationManager = null;
       this.replicationQueues = null;
       this.replicationPeers = null;
       this.replicationTracker = null;
+      this.replicationLoad = null;
     }
   }
 
@@ -310,4 +315,29 @@ public class Replication extends WALActionsListener.Base implements
       }
     }
   }
+
+  @Override
+  public ReplicationLoad refreshAndGetReplicationLoad() {
+    if (this.replicationLoad == null) {
+      return null;
+    }
+    // always build for latest data
+    buildReplicationLoad();
+    return this.replicationLoad;
+  }
+
+  private void buildReplicationLoad() {
+    // get source
+    List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
+    List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
+
+    for (ReplicationSourceInterface source : sources) {
+      if (source instanceof ReplicationSource) {
+        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
+      }
+    }
+    // get sink
+    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
+    this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
new file mode 100644
index 0000000..b3f3ecb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright 2014 The Apache Software Foundation Licensed to the Apache Software Foundation
(ASF)
+ * under one or more contributor license agreements. See the NOTICE file distributed with
this work
+ * for additional information regarding copyright ownership. The ASF licenses this file to
you under
+ * the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed
to in
+ * writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific
+ * language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Strings;
+
+/**
+ * This class is used for exporting some of the info from replication metrics
+ */
+@InterfaceAudience.Private
+public class ReplicationLoad {
+
+  // Empty load instance.
+  public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
+
+  private List<MetricsSource> sourceMetricsList;
+  private MetricsSink sinkMetrics;
+
+  private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceList;
+  private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
+
+  /** default constructor */
+  public ReplicationLoad() {
+    super();
+  }
+
+  /**
+   * buildReplicationLoad
+   * @param srMetricsList
+   * @param skMetrics
+   */
+
+  public void buildReplicationLoad(final List<MetricsSource> srMetricsList,
+      final MetricsSink skMetrics) {
+    this.sourceMetricsList = srMetricsList;
+    this.sinkMetrics = skMetrics;
+
+    // build the SinkLoad
+    ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
+        ClusterStatusProtos.ReplicationLoadSink.newBuilder();
+    rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
+    rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimeStampOfLastAppliedOp());
+    this.replicationLoadSink = rLoadSinkBuild.build();
+
+    // build the SourceLoad List
+    this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>();
+    for (MetricsSource sm : this.sourceMetricsList) {
+      long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
+      int sizeOfLogQueue = sm.getSizeOfLogQueue();
+      long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
+      long replicationLag;
+      long timePassedAfterLastShippedOp =
+          EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
+      if (sizeOfLogQueue != 0) {
+        // err on the large side
+        replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
+      } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+        replicationLag = ageOfLastShippedOp; // last shipped happen recently
+      } else {
+        // last shipped may happen last night,
+        // so NO real lag although ageOfLastShippedOp is non-zero
+        replicationLag = 0;
+      }
+
+      ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
+          ClusterStatusProtos.ReplicationLoadSource.newBuilder();
+      rLoadSourceBuild.setPeerID(sm.getPeerID());
+      rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
+      rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
+      rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
+      rLoadSourceBuild.setReplicationLag(replicationLag);
+
+      this.replicationLoadSourceList.add(rLoadSourceBuild.build());
+    }
+
+  }
+
+  /**
+   * sourceToString
+   * @return a string contains sourceReplicationLoad information
+   */
+  public String sourceToString() {
+    if (this.sourceMetricsList == null) return null;
+
+    StringBuilder sb = new StringBuilder();
+
+    for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList)
{
+
+      sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
+      sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
+      sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
+      sb =
+          Strings.appendKeyValue(sb, "TimeStampsOfLastShippedOp",
+            (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
+      sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * sinkToString
+   * @return a string contains sinkReplicationLoad information
+   */
+  public String sinkToString() {
+    if (this.replicationLoadSink == null) return null;
+
+    StringBuilder sb = new StringBuilder();
+    sb =
+        Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
+          this.replicationLoadSink.getAgeOfLastAppliedOp());
+    sb =
+        Strings.appendKeyValue(sb, "TimeStampsOfLastAppliedOp",
+          (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
+
+    return sb.toString();
+  }
+
+  public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
+    return this.replicationLoadSink;
+  }
+
+  public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceList()
{
+    return this.replicationLoadSourceList;
+  }
+
+  /**
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 9a60131..3276418 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -254,4 +254,12 @@ public class ReplicationSink {
       "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
       ", total replicated edits: " + this.totalReplicatedEdits;
   }
+
+  /**
+   * Get replication Sink Metrics
+   * @return MetricsSink
+   */
+  public MetricsSink getSinkMetrics() {
+    return this.metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ee43956..714080f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -869,4 +869,12 @@ public class ReplicationSource extends Thread
       ", currently replicating from: " + this.currentPath +
       " at position: " + position;
   }
+
+  /**
+   * Get Replication Source Metrics
+   * @return sourceMetrics
+   */
+  public MetricsSource getSourceMetrics() {
+    return this.metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 4163b66..d8d735f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -31,13 +31,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -556,4 +560,45 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     hadmin.close();
   }
 
+  /**
+   * Test for HBASE-9531
+   * put a few rows into htable1, which should be replicated to htable2
+   * create a ClusterStatus instance 'status' from HBaseAdmin
+   * test : status.getLoad(server).getReplicationLoadSourceList()
+   * test : status.getLoad(server).getReplicationLoadSink()
+   * * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testReplicationStatus() throws Exception {
+    LOG.info("testReplicationStatus");
+
+    try (Admin admin = utility1.getConnection().getAdmin()) {
+
+      final byte[] qualName = Bytes.toBytes("q");
+      Put p;
+
+      for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+        p = new Put(Bytes.toBytes("row" + i));
+        p.add(famName, qualName, Bytes.toBytes("val" + i));
+        htable1.put(p);
+      }
+
+      ClusterStatus status = admin.getClusterStatus();
+
+      for (ServerName server : status.getServers()) {
+        ServerLoad sl = status.getLoad(server);
+        List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
+        ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
+
+        // check SourceList has at least one entry
+        assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() >
0));
+
+        // check Sink exist only as it is difficult to verify the value on the fly
+        assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
+          (rLoadSink.getAgeOfLastAppliedOp() >= 0));
+        assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
+          (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index c0ea862..35ee36c 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -608,7 +608,7 @@ module Hbase
       end
     end
 
-    def status(format)
+    def status(format, type)
       status = @admin.getClusterStatus()
       if format == "detailed"
         puts("version %s" % [ status.getHBaseVersion() ])
@@ -635,6 +635,46 @@ module Hbase
         for server in status.getDeadServerNames()
           puts("    %s" % [ server ])
         end
+      elsif format == "replication"
+        #check whether replication is enabled or not
+        if (!@admin.getConfiguration().getBoolean(org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_KEY,

+          org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_DEFAULT))
+          puts("Please enable replication first.")
+        else
+          puts("version %s" % [ status.getHBaseVersion() ])
+          puts("%d live servers" % [ status.getServersSize() ])
+          for server in status.getServers()
+            sl = status.getLoad(server)
+            rSinkString   = "       SINK  :"
+            rSourceString = "       SOURCE:"
+            rLoadSink = sl.getReplicationLoadSink()
+            rSinkString << " AgeOfLastAppliedOp=" + rLoadSink.getAgeOfLastAppliedOp().to_s
+            rSinkString << ", TimeStampsOfLastAppliedOp=" + 
+			    (java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString()
+            rLoadSourceList = sl.getReplicationLoadSourceList()
+            index = 0
+            while index < rLoadSourceList.size()
+              rLoadSource = rLoadSourceList.get(index)
+              rSourceString << " PeerID=" + rLoadSource.getPeerID()
+              rSourceString << ", AgeOfLastShippedOp=" + rLoadSource.getAgeOfLastShippedOp().to_s
+              rSourceString << ", SizeOfLogQueue=" + rLoadSource.getSizeOfLogQueue().to_s
+              rSourceString << ", TimeStampsOfLastShippedOp=" + 
+			      (java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp())).toString()
+              rSourceString << ", Replication Lag=" + rLoadSource.getReplicationLag().to_s
+              index = index + 1
+            end
+            puts("    %s:" %
+            [ server.getHostname() ])
+            if type.casecmp("SOURCE") == 0
+              puts("%s" % rSourceString)
+            elsif type.casecmp("SINK") == 0
+              puts("%s" % rSinkString)
+            else
+              puts("%s" % rSourceString)
+              puts("%s" % rSinkString)
+            end
+          end
+        end
       elsif format == "simple"
         load = 0
         regions = 0

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-shell/src/main/ruby/shell/commands/status.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/status.rb b/hbase-shell/src/main/ruby/shell/commands/status.rb
index f72c13c..b22b272 100644
--- a/hbase-shell/src/main/ruby/shell/commands/status.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/status.rb
@@ -22,18 +22,21 @@ module Shell
     class Status < Command
       def help
         return <<-EOF
-Show cluster status. Can be 'summary', 'simple', or 'detailed'. The
+Show cluster status. Can be 'summary', 'simple', 'detailed', or 'replication'. The
 default is 'summary'. Examples:
 
   hbase> status
   hbase> status 'simple'
   hbase> status 'summary'
   hbase> status 'detailed'
+  hbase> status 'replication'
+  hbase> status 'replication', 'source'
+  hbase> status 'replication', 'sink'
 EOF
       end
 
-      def command(format = 'summary')
-        admin.status(format)
+      def command(format = 'summary',type = 'both')
+        admin.status(format, type)
       end
     end
   end

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-shell/src/test/ruby/hbase/admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index caede3a..1925864 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -356,5 +356,17 @@ module Hbase
       assert_not_equal(nil, table)
       table.close
     end
+
+    define_test "Get replication status" do
+      replication_status("replication", "both")
+    end
+
+    define_test "Get replication source metrics information" do
+      replication_status("replication", "source")
+    end
+
+    define_test "Get replication sink metrics information" do
+      replication_status("replication", "sink")
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5b40200/hbase-shell/src/test/ruby/test_helper.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/test_helper.rb b/hbase-shell/src/test/ruby/test_helper.rb
index 5579761..5dfafc5 100644
--- a/hbase-shell/src/test/ruby/test_helper.rb
+++ b/hbase-shell/src/test/ruby/test_helper.rb
@@ -94,6 +94,10 @@ module Hbase
         puts "IGNORING DROP TABLE ERROR: #{e}"
       end
     end
+
+    def replication_status(format,type)
+      return admin.status(format,type)
+    end
   end
 end
 


Mime
View raw message