Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4A282200C86 for ; Wed, 31 May 2017 16:59:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 48682160BEB; Wed, 31 May 2017 14:59:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1E60B160BDB for ; Wed, 31 May 2017 16:59:47 +0200 (CEST) Received: (qmail 67059 invoked by uid 500); 31 May 2017 14:59:47 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66962 invoked by uid 99); 31 May 2017 14:59:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 14:59:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D319EE109B; Wed, 31 May 2017 14:59:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 31 May 2017 14:59:49 -0000 Message-Id: <868e0b1b453349e3bcc0aca400ca466f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/28] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Wed, 31 May 2017 14:59:50 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.ReplicationEndpointForTest.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.ReplicationEndpointForTest.html b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.ReplicationEndpointForTest.html new file mode 100644 index 0000000..ce6219c --- /dev/null +++ b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.ReplicationEndpointForTest.html @@ -0,0 +1,165 @@ + + + + + + +Uses of Class org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest (Apache HBase 2.0.0-SNAPSHOT Test API) + + + + + + + + + + +
+

Uses of Class
org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest

+
+ + + + + +

Copyright © 2007–2017 The Apache Software Foundation. All rights reserved.

+ + http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.html b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.html new file mode 100644 index 0000000..1741932 --- /dev/null +++ b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/class-use/TestReplicator.html @@ -0,0 +1,125 @@ + + + + + + +Uses of Class org.apache.hadoop.hbase.replication.regionserver.TestReplicator (Apache HBase 2.0.0-SNAPSHOT Test API) + + + + + + + + + + +
+

Uses of Class
org.apache.hadoop.hbase.replication.regionserver.TestReplicator

+
+
No usage of org.apache.hadoop.hbase.replication.regionserver.TestReplicator
+ + + + +

Copyright © 2007–2017 The Apache Software Foundation. All rights reserved.

+ + http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-frame.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-frame.html b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-frame.html index 09e0f24..7046dbd 100644 --- a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-frame.html +++ b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-frame.html @@ -28,6 +28,10 @@
  • TestReplicationSourceManager.FailInitializeDummyReplicationSource
  • TestReplicationSourceManagerZkImpl
  • TestReplicationThrottler
  • +
  • TestReplicator
  • +
  • TestReplicator.FailureInjectingReplicationEndpointForTest
  • +
  • TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface
  • +
  • TestReplicator.ReplicationEndpointForTest
  • TestSourceFSConfigurationProvider
  • TestTableBasedReplicationSourceManagerImpl
  • TestWALEntryStream
  • http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-summary.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-summary.html b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-summary.html index 8ed2473..61bdb5e 100644 --- a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-summary.html +++ b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-summary.html @@ -155,6 +155,22 @@   +TestReplicator +  + + +TestReplicator.FailureInjectingReplicationEndpointForTest +  + + +TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface +  + + +TestReplicator.ReplicationEndpointForTest +  + + TestSourceFSConfigurationProvider   http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-tree.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-tree.html b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-tree.html index e7fff70..13702c9 100644 --- a/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-tree.html +++ b/testdevapidocs/org/apache/hadoop/hbase/replication/regionserver/package-tree.html @@ -81,6 +81,27 @@
  • org.apache.hadoop.hbase.replication.TestReplicationEndpoint.EverythingPassesWALEntryFilter (implements org.apache.hadoop.hbase.replication.WALEntryFilter) @@ -3002,6 +3008,12 @@
  • org.apache.hadoop.hbase.replication.TestReplicationWithTags.TestCoprocessorForTagsAtSink (implements org.apache.hadoop.hbase.coprocessor.RegionObserver)
  • org.apache.hadoop.hbase.replication.TestReplicationWithTags.TestCoprocessorForTagsAtSource (implements org.apache.hadoop.hbase.coprocessor.RegionObserver)
  • org.apache.hadoop.hbase.master.cleaner.TestReplicationZKNodeCleaner
  • +
  • org.apache.hadoop.hbase.replication.regionserver.TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface (implements org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface)
  • +
  • org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest.ReplicatorForTest + +
  • org.apache.hadoop.hbase.client.TestReplicaWithCluster
  • org.apache.hadoop.hbase.client.TestReplicaWithCluster.RegionServerHostingPrimayMetaRegionSlowCopro (implements org.apache.hadoop.hbase.coprocessor.RegionObserver)
  • org.apache.hadoop.hbase.client.TestReplicaWithCluster.RegionServerStoppedCopro (implements org.apache.hadoop.hbase.coprocessor.RegionObserver)
  • http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface.html new file mode 100644 index 0000000..8654872 --- /dev/null +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.FailureInjectingReplicationEndpointForTest.FailureInjectingBlockingInterface.html @@ -0,0 +1,499 @@ + + + +Source code + + + +
    +
    001/*
    +002 * Licensed to the Apache Software Foundation (ASF) under one
    +003 * or more contributor license agreements.  See the NOTICE file
    +004 * distributed with this work for additional information
    +005 * regarding copyright ownership.  The ASF licenses this file
    +006 * to you under the Apache License, Version 2.0 (the
    +007 * "License"); you may not use this file except in compliance
    +008 * with the License.  You may obtain a copy of the License at
    +009 *
    +010 *     http://www.apache.org/licenses/LICENSE-2.0
    +011 *
    +012 * Unless required by applicable law or agreed to in writing, software
    +013 * distributed under the License is distributed on an "AS IS" BASIS,
    +014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +015 * See the License for the specific language governing permissions and
    +016 * limitations under the License.
    +017 */
    +018package org.apache.hadoop.hbase.replication.regionserver;
    +019
    +020import static org.junit.Assert.assertEquals;
    +021
    +022import java.io.IOException;
    +023import java.util.List;
    +024import java.util.concurrent.atomic.AtomicBoolean;
    +025
    +026import org.apache.commons.logging.Log;
    +027import org.apache.commons.logging.LogFactory;
    +028import org.apache.hadoop.fs.Path;
    +029import org.apache.hadoop.hbase.HBaseTestingUtility;
    +030import org.apache.hadoop.hbase.TableName;
    +031import org.apache.hadoop.hbase.Waiter;
    +032import org.apache.hadoop.hbase.client.HBaseAdmin;
    +033import org.apache.hadoop.hbase.client.Put;
    +034import org.apache.hadoop.hbase.ipc.RpcServer;
    +035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
    +036import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
    +037import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
    +038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.*;
    +039import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
    +040import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
    +041import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
    +042import org.apache.hadoop.hbase.replication.TestReplicationBase;
    +043import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
    +044import org.apache.hadoop.hbase.testclassification.MediumTests;
    +045import org.apache.hadoop.hbase.wal.WAL.Entry;
    +046
    +047import org.junit.AfterClass;
    +048import org.junit.BeforeClass;
    +049import org.junit.Test;
    +050import org.junit.experimental.categories.Category;
    +051
    +052
    +053@Category(MediumTests.class)
    +054public class TestReplicator extends TestReplicationBase {
    +055
    +056  static final Log LOG = LogFactory.getLog(TestReplicator.class);
    +057  static final int NUM_ROWS = 10;
    +058
    +059  @BeforeClass
    +060  public static void setUpBeforeClass() throws Exception {
    +061    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
    +062    conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
    +063    TestReplicationBase.setUpBeforeClass();
    +064    admin.removePeer("2"); // Remove the peer set up for us by base class
    +065  }
    +066
    +067  @Test
    +068  public void testReplicatorBatching() throws Exception {
    +069    // Clear the tables
    +070    truncateTable(utility1, tableName);
    +071    truncateTable(utility2, tableName);
    +072
    +073    // Replace the peer set up for us by the base class with a wrapper for this test
    +074    admin.addPeer("testReplicatorBatching",
    +075      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
    +076        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
    +077
    +078    ReplicationEndpointForTest.setBatchCount(0);
    +079    ReplicationEndpointForTest.setEntriesCount(0);
    +080    try {
    +081      ReplicationEndpointForTest.pause();
    +082      try {
    +083        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
    +084        // have to be replicated separately.
    +085        final byte[] valueBytes = new byte[8 *1024];
    +086        for (int i = 0; i < NUM_ROWS; i++) {
    +087          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
    +088            .addColumn(famName, null, valueBytes)
    +089          );
    +090        }
    +091      } finally {
    +092        ReplicationEndpointForTest.resume();
    +093      }
    +094
    +095      // Wait for replication to complete.
    +096      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
    +097        @Override
    +098        public boolean evaluate() throws Exception {
    +099          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
    +100        }
    +101
    +102        @Override
    +103        public String explainFailure() throws Exception {
    +104          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
    +105        }
    +106      });
    +107
    +108      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
    +109        ReplicationEndpointForTest.getBatchCount());
    +110      assertEquals("We did not replicate enough rows", NUM_ROWS,
    +111        utility2.countRows(htable2));
    +112    } finally {
    +113      admin.removePeer("testReplicatorBatching");
    +114    }
    +115  }
    +116
    +117  @Test
    +118  public void testReplicatorWithErrors() throws Exception {
    +119    // Clear the tables
    +120    truncateTable(utility1, tableName);
    +121    truncateTable(utility2, tableName);
    +122
    +123    // Replace the peer set up for us by the base class with a wrapper for this test
    +124    admin.addPeer("testReplicatorWithErrors",
    +125      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
    +126          .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
    +127        null);
    +128
    +129    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
    +130    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
    +131    try {
    +132      FailureInjectingReplicationEndpointForTest.pause();
    +133      try {
    +134        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
    +135        // have to be replicated separately.
    +136        final byte[] valueBytes = new byte[8 *1024];
    +137        for (int i = 0; i < NUM_ROWS; i++) {
    +138          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
    +139            .addColumn(famName, null, valueBytes)
    +140          );
    +141        }
    +142      } finally {
    +143        FailureInjectingReplicationEndpointForTest.resume();
    +144      }
    +145
    +146      // Wait for replication to complete.
    +147      // We can expect 10 batches
    +148      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
    +149        @Override
    +150        public boolean evaluate() throws Exception {
    +151          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
    +152        }
    +153
    +154        @Override
    +155        public String explainFailure() throws Exception {
    +156          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
    +157        }
    +158      });
    +159
    +160      assertEquals("We did not replicate enough rows", NUM_ROWS,
    +161        utility2.countRows(htable2));
    +162    } finally {
    +163      admin.removePeer("testReplicatorWithErrors");
    +164    }
    +165  }
    +166
    +167  @AfterClass
    +168  public static void tearDownAfterClass() throws Exception {
    +169    TestReplicationBase.tearDownAfterClass();
    +170  }
    +171
    +172  private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
    +173    HBaseAdmin admin = util.getHBaseAdmin();
    +174    admin.disableTable(tableName);
    +175    admin.truncateTable(tablename, false);
    +176  }
    +177
    +178  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
    +179
    +180    private static int batchCount;
    +181    private static int entriesCount;
    +182    private static final Object latch = new Object();
    +183    private static AtomicBoolean useLatch = new AtomicBoolean(false);
    +184
    +185    public static void resume() {
    +186      useLatch.set(false);
    +187      synchronized (latch) {
    +188        latch.notifyAll();
    +189      }
    +190    }
    +191
    +192    public static void pause() {
    +193      useLatch.set(true);
    +194    }
    +195
    +196    public static void await() throws InterruptedException {
    +197      if (useLatch.get()) {
    +198        LOG.info("Waiting on latch");
    +199        latch.wait();
    +200        LOG.info("Waited on latch, now proceeding");
    +201      }
    +202    }
    +203
    +204    public static int getBatchCount() {
    +205      return batchCount;
    +206    }
    +207
    +208    public static void setBatchCount(int i) {
    +209      batchCount = i;
    +210    }
    +211
    +212    public static int getEntriesCount() {
    +213      return entriesCount;
    +214    }
    +215
    +216    public static void setEntriesCount(int i) {
    +217      entriesCount = i;
    +218    }
    +219
    +220    public class ReplicatorForTest extends Replicator {
    +221
    +222      public ReplicatorForTest(List<Entry> entries, int ordinal) {
    +223        super(entries, ordinal);
    +224      }
    +225
    +226      @Override
    +227      protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
    +228          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
    +229          throws IOException {
    +230        try {
    +231          long size = 0;
    +232          for (Entry e: entries) {
    +233            size += e.getKey().estimatedSerializedSizeOf();
    +234            size += e.getEdit().estimatedSerializedSizeOf();
    +235          }
    +236          LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
    +237              entries.size() + " entries with total size " + size + " bytes to " +
    +238              replicationClusterId);
    +239          super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
    +240            hfileArchiveDir);
    +241          entriesCount += entries.size();
    +242          batchCount++;
    +243          LOG.info("Completed replicating batch " + System.identityHashCode(entries));
    +244        } catch (IOException e) {
    +245          LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
    +246          throw e;
    +247        }
    +248      }
    +249    }
    +250
    +251    @Override
    +252    public boolean replicate(ReplicateContext replicateContext) {
    +253      try {
    +254        await();
    +255      } catch (InterruptedException e) {
    +256        LOG.warn("Interrupted waiting for latch", e);
    +257      }
    +258      return super.replicate(replicateContext);
    +259    }
    +260
    +261    @Override
    +262    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
    +263      return new ReplicatorForTest(entries, ordinal);
    +264    }
    +265  }
    +266
    +267  public static class FailureInjectingReplicationEndpointForTest
    +268      extends ReplicationEndpointForTest {
    +269
    +270    static class FailureInjectingBlockingInterface implements BlockingInterface {
    +271
    +272      private final BlockingInterface delegate;
    +273      private volatile boolean failNext;
    +274
    +275      public FailureInjectingBlockingInterface(BlockingInterface delegate) {
    +276        this.delegate = delegate;
    +277      }
    +278
    +279      @Override
    +280      public GetRegionInfoResponse getRegionInfo(RpcController controller,
    +281          GetRegionInfoRequest request) throws ServiceException {
    +282        return delegate.getRegionInfo(controller, request);
    +283      }
    +284
    +285      @Override
    +286      public GetStoreFileResponse getStoreFile(RpcController controller,
    +287          GetStoreFileRequest request) throws ServiceException {
    +288        return delegate.getStoreFile(controller, request);
    +289      }
    +290
    +291      @Override
    +292      public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
    +293          GetOnlineRegionRequest request) throws ServiceException {
    +294        return delegate.getOnlineRegion(controller, request);
    +295      }
    +296
    +297      @Override
    +298      public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
    +299          throws ServiceException {
    +300        return delegate.openRegion(controller, request);
    +301      }
    +302
    +303      @Override
    +304      public WarmupRegionResponse warmupRegion(RpcController controller,
    +305          WarmupRegionRequest request) throws ServiceException {
    +306        return delegate.warmupRegion(controller, request);
    +307      }
    +308
    +309      @Override
    +310      public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
    +311          throws ServiceException {
    +312        return delegate.closeRegion(controller, request);
    +313      }
    +314
    +315      @Override
    +316      public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
    +317          throws ServiceException {
    +318        return delegate.flushRegion(controller, request);
    +319      }
    +320
    +321      @Override
    +322      public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
    +323          throws ServiceException {
    +324        return delegate.splitRegion(controller, request);
    +325      }
    +326
    +327      @Override
    +328      public CompactRegionResponse compactRegion(RpcController controller,
    +329          CompactRegionRequest request) throws ServiceException {
    +330        return delegate.compactRegion(controller, request);
    +331      }
    +332
    +333      @Override
    +334      public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
    +335          ReplicateWALEntryRequest request) throws ServiceException {
    +336        if (!failNext) {
    +337          failNext = true;
    +338          return delegate.replicateWALEntry(controller, request);
    +339        } else {
    +340          failNext = false;
    +341          throw new ServiceException("Injected failure");
    +342        }
    +343      }
    +344
    +345      @Override
    +346      public ReplicateWALEntryResponse replay(RpcController controller,
    +347          ReplicateWALEntryRequest request) throws ServiceException {
    +348        return delegate.replay(controller, request);
    +349      }
    +350
    +351      @Override
    +352      public RollWALWriterResponse rollWALWriter(RpcController controller,
    +353          RollWALWriterRequest request) throws ServiceException {
    +354        return delegate.rollWALWriter(controller, request);
    +355      }
    +356
    +357      @Override
    +358      public GetServerInfoResponse getServerInfo(RpcController controller,
    +359          GetServerInfoRequest request) throws ServiceException {
    +360        return delegate.getServerInfo(controller, request);
    +361      }
    +362
    +363      @Override
    +364      public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
    +365          throws ServiceException {
    +366        return delegate.stopServer(controller, request);
    +367      }
    +368
    +369      @Override
    +370      public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
    +371          UpdateFavoredNodesRequest request) throws ServiceException {
    +372        return delegate.updateFavoredNodes(controller, request);
    +373      }
    +374
    +375      @Override
    +376      public UpdateConfigurationResponse updateConfiguration(RpcController controller,
    +377          UpdateConfigurationRequest request) throws ServiceException {
    +378        return delegate.updateConfiguration(controller, request);
    +379      }
    +380
    +381      @Override
    +382      public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(RpcController controller,
    +383          CloseRegionForSplitOrMergeRequest request) throws ServiceException {
    +384        return delegate.closeRegionForSplitOrMerge(controller, request);
    +385      }
    +386
    +387      @Override
    +388      public GetRegionLoadResponse getRegionLoad(RpcController controller,
    +389          GetRegionLoadRequest request) throws ServiceException {
    +390        return delegate.getRegionLoad(controller, request);
    +391      }
    +392
    +393      @Override
    +394      public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
    +395          ClearCompactionQueuesRequest request) throws ServiceException {
    +396        return delegate.clearCompactionQueues(controller, request);
    +397      }
    +398
    +399      @Override
    +400      public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
    +401          GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
    +402        return delegate.getSpaceQuotaSnapshots(controller, request);
    +403      }
    +404    }
    +405
    +406    public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
    +407
    +408      public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
    +409        super(entries, ordinal);
    +410      }
    +411
    +412      @Override
    +413      protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
    +414          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
    +415          throws IOException {
    +416        super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
    +417          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
    +418      }
    +419    }
    +420
    +421    @Override
    +422    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
    +423      return new FailureInjectingReplicatorForTest(entries, ordinal);
    +424    }
    +425  }
    +426
    +427}
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    + +