Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AEBC210F9E for ; Tue, 11 Feb 2014 00:48:53 +0000 (UTC) Received: (qmail 87442 invoked by uid 500); 11 Feb 2014 00:48:52 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 87406 invoked by uid 500); 11 Feb 2014 00:48:52 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87399 invoked by uid 99); 11 Feb 2014 00:48:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 00:48:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 00:48:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A21F523889BF; Tue, 11 Feb 2014 00:48:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1566922 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/replication/regionserver/ Date: Tue, 11 Feb 2014 00:48:26 -0000 To: commits@hbase.apache.org From: jdcryans@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140211004826.A21F523889BF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jdcryans Date: Tue Feb 11 00:48:26 2014 New Revision: 1566922 URL: http://svn.apache.org/r1566922 Log: HBASE-9501 Provide throttling for replication (Feng Honghua via JD) Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1566922&r1=1566921&r2=1566922&view=diff ============================================================================== --- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Feb 11 00:48:26 2014 @@ -131,6 +131,8 @@ public class ReplicationSource extends T private ReplicationSinkManager replicationSinkMgr; //WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; + // throttler + private ReplicationThrottler throttler; /** * Instantiation method used by region servers @@ -164,6 +166,8 @@ public class ReplicationSource extends T // replication and make replication specific settings such as compression or codec to use // passing Cells. this.conn = HConnectionManager.getConnection(this.conf); + long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); + this.throttler = new ReplicationThrottler((double)bandwidth/10.0); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.manager = manager; @@ -598,6 +602,7 @@ public class ReplicationSource extends T Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping between retries"); + Thread.currentThread().interrupt(); } return sleepMultiplier < maxRetriesMultiplier; } @@ -661,6 +666,22 @@ public class ReplicationSource extends T } SinkPeer sinkPeer = null; try { + if (this.throttler.isEnabled()) { + long sleepTicks = this.throttler.getNextSleepInterval(currentSize); + if (sleepTicks > 0) { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("To sleep " + sleepTicks + "ms for throttling control"); + } + Thread.sleep(sleepTicks); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping for throttling control"); + Thread.currentThread().interrupt(); + } + // reset throttler's cycle start tick when sleep for throttling occurs + this.throttler.resetStartTick(); + } + } sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { @@ -675,6 +696,9 @@ public class ReplicationSource extends T this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } + if (this.throttler.isEnabled()) { + this.throttler.addPushSize(currentSize); + } this.totalReplicatedEdits += entries.size(); this.totalReplicatedOperations += currentNbOperations; this.metrics.shipBatch(this.currentNbOperations); Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java?rev=1566922&view=auto ============================================================================== --- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java (added) +++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java Tue Feb 11 00:48:26 2014 @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Per-peer per-node throttling controller for replication: enabled if + * bandwidth > 0, a cycle = 100ms, by throttling we guarantee data pushed + * to peer within each cycle won't exceed 'bandwidth' bytes + */ +@InterfaceAudience.Private +public class ReplicationThrottler { + private final boolean enabled; + private final double bandwidth; + private long cyclePushSize; + private long cycleStartTick; + + /** + * ReplicationThrottler constructor + * If bandwidth less than 1, throttling is disabled + * @param bandwidth per cycle(100ms) + */ + public ReplicationThrottler(final double bandwidth) { + this.bandwidth = bandwidth; + this.enabled = this.bandwidth > 0; + if (this.enabled) { + this.cyclePushSize = 0; + this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis(); + } + } + + /** + * If throttling is enabled + * @return true if throttling is enabled + */ + public boolean isEnabled() { + return this.enabled; + } + + /** + * Get how long the caller should sleep according to the current size and + * current cycle's total push size and start tick, return the sleep interval + * for throttling control. + * @param size is the size of edits to be pushed + * @return sleep interval for throttling control + */ + public long getNextSleepInterval(final int size) { + if (!this.enabled) { + return 0; + } + + long sleepTicks = 0; + long now = EnvironmentEdgeManager.currentTimeMillis(); + // 1. if cyclePushSize exceeds bandwidth, we need to sleep some + // following cycles to amortize, this case can occur when a single push + // exceeds the bandwidth + if ((double)this.cyclePushSize > bandwidth) { + double cycles = Math.ceil((double)this.cyclePushSize / bandwidth); + long shouldTillTo = this.cycleStartTick + (long)(cycles * 100); + if (shouldTillTo > now) { + sleepTicks = shouldTillTo - now; + } else { + // no reset in shipEdits since no sleep, so we need to reset cycleStartTick here! + this.cycleStartTick = now; + } + this.cyclePushSize = 0; + } else { + long nextCycleTick = this.cycleStartTick + 100; //a cycle is 100ms + if (now >= nextCycleTick) { + // 2. switch to next cycle if the current cycle has passed + this.cycleStartTick = now; + this.cyclePushSize = 0; + } else if (this.cyclePushSize > 0 && + (double)(this.cyclePushSize + size) >= bandwidth) { + // 3. delay the push to next cycle if exceeds throttling bandwidth. + // enforcing cyclePushSize > 0 to avoid the unnecessary sleep for case + // where a cycle's first push size(currentSize) > bandwidth + sleepTicks = nextCycleTick - now; + this.cyclePushSize = 0; + } + } + return sleepTicks; + } + + /** + * Add current size to the current cycle's total push size + * @param size is the current size added to the current cycle's + * total push size + */ + public void addPushSize(final int size) { + if (this.enabled) { + this.cyclePushSize += size; + } + } + + /** + * Reset the cycle start tick to NOW + */ + public void resetStartTick() { + if (this.enabled) { + this.cycleStartTick = EnvironmentEdgeManager.currentTimeMillis(); + } + } +} Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java?rev=1566922&view=auto ============================================================================== --- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java (added) +++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java Tue Feb 11 00:48:26 2014 @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.SmallTests; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReplicationThrottler { + + private static final Log LOG = LogFactory.getLog(TestReplicationThrottler.class); + + /** + * unit test for throttling + */ + @Test(timeout=10000) + public void testThrottling() { + LOG.info("testThrottling"); + + // throttle bandwidth is 100 and 10 bytes/cycle respectively + ReplicationThrottler throttler1 = new ReplicationThrottler(100); + ReplicationThrottler throttler2 = new ReplicationThrottler(10); + + long ticks1 = throttler1.getNextSleepInterval(1000); + long ticks2 = throttler2.getNextSleepInterval(1000); + + // 1. the first push size is 1000, though 1000 bytes exceeds 100/10 + // bandwidthes, but no sleep since it's the first push of current + // cycle, amortizing occurs when next push arrives + assertEquals(0, ticks1); + assertEquals(0, ticks2); + + throttler1.addPushSize(1000); + throttler2.addPushSize(1000); + + ticks1 = throttler1.getNextSleepInterval(5); + ticks2 = throttler2.getNextSleepInterval(5); + + // 2. when the second push(5) arrives and throttling(5) is called, the + // current cyclePushSize is 1000 bytes, this should make throttler1 + // sleep 1000/100 = 10 cycles = 1s and make throttler2 sleep 1000/10 + // = 100 cycles = 10s before the second push occurs -- amortize case + // after amortizing, both cycleStartTick and cyclePushSize are reset + assertTrue(ticks1 == 1000 || ticks1 == 999); + assertTrue(ticks2 == 10000 || ticks2 == 9999); + + throttler1.resetStartTick(); + throttler2.resetStartTick(); + + throttler1.addPushSize(5); + throttler2.addPushSize(5); + + ticks1 = throttler1.getNextSleepInterval(45); + ticks2 = throttler2.getNextSleepInterval(45); + + // 3. when the third push(45) arrives and throttling(45) is called, the + // current cyclePushSize is 5 bytes, 50-byte makes throttler1 no + // sleep, but can make throttler2 delay to next cycle + // note: in real case, sleep time should cover time elapses during push + // operation + assertTrue(ticks1 == 0); + assertTrue(ticks2 == 100 || ticks2 == 99); + + throttler2.resetStartTick(); + + throttler1.addPushSize(45); + throttler2.addPushSize(45); + + ticks1 = throttler1.getNextSleepInterval(60); + ticks2 = throttler2.getNextSleepInterval(60); + + // 4. when the fourth push(60) arrives and throttling(60) is called, throttler1 + // delay to next cycle since 45+60 == 105; and throttler2 should firstly sleep + // ceiling(45/10)= 5 cycles = 500ms to amortize previous push + // note: in real case, sleep time should cover time elapses during push + // operation + assertTrue(ticks1 == 100 || ticks1 == 99); + assertTrue(ticks2 == 500 || ticks2 == 499); + } +}