Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5918B200C86 for ; Wed, 31 May 2017 16:59:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 55B04160BE6; Wed, 31 May 2017 14:59:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9B461160BBA for ; Wed, 31 May 2017 16:59:47 +0200 (CEST) Received: (qmail 66734 invoked by uid 500); 31 May 2017 14:59:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66722 invoked by uid 99); 31 May 2017 14:59:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 14:59:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8472DFB94; Wed, 31 May 2017 14:59:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 31 May 2017 14:59:46 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/28] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Wed, 31 May 2017 14:59:49 -0000 Repository: hbase-site Updated Branches: refs/heads/asf-site 8fe8da7ba -> f4ba10c13 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f4ba10c1/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.html new file mode 100644 index 0000000..8654872 --- /dev/null +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.html @@ -0,0 +1,499 @@ + + + +Source code + + + +
+
001/*
+002 * Licensed to the Apache Software Foundation (ASF) under one
+003 * or more contributor license agreements.  See the NOTICE file
+004 * distributed with this work for additional information
+005 * regarding copyright ownership.  The ASF licenses this file
+006 * to you under the Apache License, Version 2.0 (the
+007 * "License"); you may not use this file except in compliance
+008 * with the License.  You may obtain a copy of the License at
+009 *
+010 *     http://www.apache.org/licenses/LICENSE-2.0
+011 *
+012 * Unless required by applicable law or agreed to in writing, software
+013 * distributed under the License is distributed on an "AS IS" BASIS,
+014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+015 * See the License for the specific language governing permissions and
+016 * limitations under the License.
+017 */
+018package org.apache.hadoop.hbase.replication.regionserver;
+019
+020import static org.junit.Assert.assertEquals;
+021
+022import java.io.IOException;
+023import java.util.List;
+024import java.util.concurrent.atomic.AtomicBoolean;
+025
+026import org.apache.commons.logging.Log;
+027import org.apache.commons.logging.LogFactory;
+028import org.apache.hadoop.fs.Path;
+029import org.apache.hadoop.hbase.HBaseTestingUtility;
+030import org.apache.hadoop.hbase.TableName;
+031import org.apache.hadoop.hbase.Waiter;
+032import org.apache.hadoop.hbase.client.HBaseAdmin;
+033import org.apache.hadoop.hbase.client.Put;
+034import org.apache.hadoop.hbase.ipc.RpcServer;
+035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+036import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+037import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.*;
+039import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+040import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+041import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+042import org.apache.hadoop.hbase.replication.TestReplicationBase;
+043import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+044import org.apache.hadoop.hbase.testclassification.MediumTests;
+045import org.apache.hadoop.hbase.wal.WAL.Entry;
+046
+047import org.junit.AfterClass;
+048import org.junit.BeforeClass;
+049import org.junit.Test;
+050import org.junit.experimental.categories.Category;
+051
+052
+053@Category(MediumTests.class)
+054public class TestReplicator extends TestReplicationBase {
+055
+056  static final Log LOG = LogFactory.getLog(TestReplicator.class);
+057  static final int NUM_ROWS = 10;
+058
+059  @BeforeClass
+060  public static void setUpBeforeClass() throws Exception {
+061    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
+062    conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
+063    TestReplicationBase.setUpBeforeClass();
+064    admin.removePeer("2"); // Remove the peer set up for us by base class
+065  }
+066
+067  @Test
+068  public void testReplicatorBatching() throws Exception {
+069    // Clear the tables
+070    truncateTable(utility1, tableName);
+071    truncateTable(utility2, tableName);
+072
+073    // Replace the peer set up for us by the base class with a wrapper for this test
+074    admin.addPeer("testReplicatorBatching",
+075      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+076        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+077
+078    ReplicationEndpointForTest.setBatchCount(0);
+079    ReplicationEndpointForTest.setEntriesCount(0);
+080    try {
+081      ReplicationEndpointForTest.pause();
+082      try {
+083        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+084        // have to be replicated separately.
+085        final byte[] valueBytes = new byte[8 *1024];
+086        for (int i = 0; i < NUM_ROWS; i++) {
+087          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+088            .addColumn(famName, null, valueBytes)
+089          );
+090        }
+091      } finally {
+092        ReplicationEndpointForTest.resume();
+093      }
+094
+095      // Wait for replication to complete.
+096      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+097        @Override
+098        public boolean evaluate() throws Exception {
+099          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
+100        }
+101
+102        @Override
+103        public String explainFailure() throws Exception {
+104          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+105        }
+106      });
+107
+108      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
+109        ReplicationEndpointForTest.getBatchCount());
+110      assertEquals("We did not replicate enough rows", NUM_ROWS,
+111        utility2.countRows(htable2));
+112    } finally {
+113      admin.removePeer("testReplicatorBatching");
+114    }
+115  }
+116
+117  @Test
+118  public void testReplicatorWithErrors() throws Exception {
+119    // Clear the tables
+120    truncateTable(utility1, tableName);
+121    truncateTable(utility2, tableName);
+122
+123    // Replace the peer set up for us by the base class with a wrapper for this test
+124    admin.addPeer("testReplicatorWithErrors",
+125      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+126          .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
+127        null);
+128
+129    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
+130    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
+131    try {
+132      FailureInjectingReplicationEndpointForTest.pause();
+133      try {
+134        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+135        // have to be replicated separately.
+136        final byte[] valueBytes = new byte[8 *1024];
+137        for (int i = 0; i < NUM_ROWS; i++) {
+138          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+139            .addColumn(famName, null, valueBytes)
+140          );
+141        }
+142      } finally {
+143        FailureInjectingReplicationEndpointForTest.resume();
+144      }
+145
+146      // Wait for replication to complete.
+147      // We can expect 10 batches
+148      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+149        @Override
+150        public boolean evaluate() throws Exception {
+151          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
+152        }
+153
+154        @Override
+155        public String explainFailure() throws Exception {
+156          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+157        }
+158      });
+159
+160      assertEquals("We did not replicate enough rows", NUM_ROWS,
+161        utility2.countRows(htable2));
+162    } finally {
+163      admin.removePeer("testReplicatorWithErrors");
+164    }
+165  }
+166
+167  @AfterClass
+168  public static void tearDownAfterClass() throws Exception {
+169    TestReplicationBase.tearDownAfterClass();
+170  }
+171
+172  private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
+173    HBaseAdmin admin = util.getHBaseAdmin();
+174    admin.disableTable(tableName);
+175    admin.truncateTable(tablename, false);
+176  }
+177
+178  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
+179
+180    private static int batchCount;
+181    private static int entriesCount;
+182    private static final Object latch = new Object();
+183    private static AtomicBoolean useLatch = new AtomicBoolean(false);
+184
+185    public static void resume() {
+186      useLatch.set(false);
+187      synchronized (latch) {
+188        latch.notifyAll();
+189      }
+190    }
+191
+192    public static void pause() {
+193      useLatch.set(true);
+194    }
+195
+196    public static void await() throws InterruptedException {
+197      if (useLatch.get()) {
+198        LOG.info("Waiting on latch");
+199        latch.wait();
+200        LOG.info("Waited on latch, now proceeding");
+201      }
+202    }
+203
+204    public static int getBatchCount() {
+205      return batchCount;
+206    }
+207
+208    public static void setBatchCount(int i) {
+209      batchCount = i;
+210    }
+211
+212    public static int getEntriesCount() {
+213      return entriesCount;
+214    }
+215
+216    public static void setEntriesCount(int i) {
+217      entriesCount = i;
+218    }
+219
+220    public class ReplicatorForTest extends Replicator {
+221
+222      public ReplicatorForTest(List<Entry> entries, int ordinal) {
+223        super(entries, ordinal);
+224      }
+225
+226      @Override
+227      protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
+228          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+229          throws IOException {
+230        try {
+231          long size = 0;
+232          for (Entry e: entries) {
+233            size += e.getKey().estimatedSerializedSizeOf();
+234            size += e.getEdit().estimatedSerializedSizeOf();
+235          }
+236          LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
+237              entries.size() + " entries with total size " + size + " bytes to " +
+238              replicationClusterId);
+239          super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
+240            hfileArchiveDir);
+241          entriesCount += entries.size();
+242          batchCount++;
+243          LOG.info("Completed replicating batch " + System.identityHashCode(entries));
+244        } catch (IOException e) {
+245          LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
+246          throw e;
+247        }
+248      }
+249    }
+250
+251    @Override
+252    public boolean replicate(ReplicateContext replicateContext) {
+253      try {
+254        await();
+255      } catch (InterruptedException e) {
+256        LOG.warn("Interrupted waiting for latch", e);
+257      }
+258      return super.replicate(replicateContext);
+259    }
+260
+261    @Override
+262    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+263      return new ReplicatorForTest(entries, ordinal);
+264    }
+265  }
+266
+267  public static class FailureInjectingReplicationEndpointForTest
+268      extends ReplicationEndpointForTest {
+269
+270    static class FailureInjectingBlockingInterface implements BlockingInterface {
+271
+272      private final BlockingInterface delegate;
+273      private volatile boolean failNext;
+274
+275      public FailureInjectingBlockingInterface(BlockingInterface delegate) {
+276        this.delegate = delegate;
+277      }
+278
+279      @Override
+280      public GetRegionInfoResponse getRegionInfo(RpcController controller,
+281          GetRegionInfoRequest request) throws ServiceException {
+282        return delegate.getRegionInfo(controller, request);
+283      }
+284
+285      @Override
+286      public GetStoreFileResponse getStoreFile(RpcController controller,
+287          GetStoreFileRequest request) throws ServiceException {
+288        return delegate.getStoreFile(controller, request);
+289      }
+290
+291      @Override
+292      public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+293          GetOnlineRegionRequest request) throws ServiceException {
+294        return delegate.getOnlineRegion(controller, request);
+295      }
+296
+297      @Override
+298      public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+299          throws ServiceException {
+300        return delegate.openRegion(controller, request);
+301      }
+302
+303      @Override
+304      public WarmupRegionResponse warmupRegion(RpcController controller,
+305          WarmupRegionRequest request) throws ServiceException {
+306        return delegate.warmupRegion(controller, request);
+307      }
+308
+309      @Override
+310      public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+311          throws ServiceException {
+312        return delegate.closeRegion(controller, request);
+313      }
+314
+315      @Override
+316      public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+317          throws ServiceException {
+318        return delegate.flushRegion(controller, request);
+319      }
+320
+321      @Override
+322      public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
+323          throws ServiceException {
+324        return delegate.splitRegion(controller, request);
+325      }
+326
+327      @Override
+328      public CompactRegionResponse compactRegion(RpcController controller,
+329          CompactRegionRequest request) throws ServiceException {
+330        return delegate.compactRegion(controller, request);
+331      }
+332
+333      @Override
+334      public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+335          ReplicateWALEntryRequest request) throws ServiceException {
+336        if (!failNext) {
+337          failNext = true;
+338          return delegate.replicateWALEntry(controller, request);
+339        } else {
+340          failNext = false;
+341          throw new ServiceException("Injected failure");
+342        }
+343      }
+344
+345      @Override
+346      public ReplicateWALEntryResponse replay(RpcController controller,
+347          ReplicateWALEntryRequest request) throws ServiceException {
+348        return delegate.replay(controller, request);
+349      }
+350
+351      @Override
+352      public RollWALWriterResponse rollWALWriter(RpcController controller,
+353          RollWALWriterRequest request) throws ServiceException {
+354        return delegate.rollWALWriter(controller, request);
+355      }
+356
+357      @Override
+358      public GetServerInfoResponse getServerInfo(RpcController controller,
+359          GetServerInfoRequest request) throws ServiceException {
+360        return delegate.getServerInfo(controller, request);
+361      }
+362
+363      @Override
+364      public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
+365          throws ServiceException {
+366        return delegate.stopServer(controller, request);
+367      }
+368
+369      @Override
+370      public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+371          UpdateFavoredNodesRequest request) throws ServiceException {
+372        return delegate.updateFavoredNodes(controller, request);
+373      }
+374
+375      @Override
+376      public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+377          UpdateConfigurationRequest request) throws ServiceException {
+378        return delegate.updateConfiguration(controller, request);
+379      }
+380
+381      @Override
+382      public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(RpcController controller,
+383          CloseRegionForSplitOrMergeRequest request) throws ServiceException {
+384        return delegate.closeRegionForSplitOrMerge(controller, request);
+385      }
+386
+387      @Override
+388      public GetRegionLoadResponse getRegionLoad(RpcController controller,
+389          GetRegionLoadRequest request) throws ServiceException {
+390        return delegate.getRegionLoad(controller, request);
+391      }
+392
+393      @Override
+394      public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
+395          ClearCompactionQueuesRequest request) throws ServiceException {
+396        return delegate.clearCompactionQueues(controller, request);
+397      }
+398
+399      @Override
+400      public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+401          GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+402        return delegate.getSpaceQuotaSnapshots(controller, request);
+403      }
+404    }
+405
+406    public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
+407
+408      public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
+409        super(entries, ordinal);
+410      }
+411
+412      @Override
+413      protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
+414          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+415          throws IOException {
+416        super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
+417          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+418      }
+419    }
+420
+421    @Override
+422    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+423      return new FailureInjectingReplicatorForTest(entries, ordinal);
+424    }
+425  }
+426
+427}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ +