hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1348285 - /hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
Date Sat, 09 Jun 2012 00:42:02 GMT
Author: eli
Date: Sat Jun  9 00:42:02 2012
New Revision: 1348285

URL: http://svn.apache.org/viewvc?rev=1348285&view=rev
Log:
Add missing file from previous commit.

Added:
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/util/DataTransferThrottler.java

Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/util/DataTransferThrottler.java?rev=1348285&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
(added)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/util/DataTransferThrottler.java
Sat Jun  9 00:42:02 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+/** 
+ * a class to throttle the data transfers.
+ * This class is thread safe. It can be shared by multiple threads.
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by
+ * threads.
+ */
+public class DataTransferThrottler {
+  private long period;          // period over which bw is imposed
+  private long periodExtension; // Max period over which bw accumulates.
+  private long bytesPerPeriod; // total number of bytes can be sent in each period
+  private long curPeriodStart; // current period starting time
+  private long curReserve;     // remaining bytes can be sent in the period
+  private long bytesAlreadyUsed;
+
+  /** Constructor 
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  public DataTransferThrottler(long bandwidthPerSec) {
+    this(500, bandwidthPerSec);  // by default throttling period is 500ms 
+  }
+
+  /**
+   * Constructor
+   * @param period in milliseconds. Bandwidth is enforced over this
+   *        period.
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  public DataTransferThrottler(long period, long bandwidthPerSec) {
+    this.curPeriodStart = System.currentTimeMillis();
+    this.period = period;
+    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+    this.periodExtension = period*3;
+  }
+
+  /**
+   * @return current throttle bandwidth in bytes per second.
+   */
+  public synchronized long getBandwidth() {
+    return bytesPerPeriod*1000/period;
+  }
+  
+  /**
+   * Sets throttle bandwidth. This takes affect latest by the end of current
+   * period.
+   * 
+   * @param bytesPerSecond 
+   */
+  public synchronized void setBandwidth(long bytesPerSecond) {
+    if ( bytesPerSecond <= 0 ) {
+      throw new IllegalArgumentException("" + bytesPerSecond);
+    }
+    bytesPerPeriod = bytesPerSecond*period/1000;
+  }
+  
+  /** Given the numOfBytes sent/received since last time throttle was called,
+   * make the current thread sleep if I/O rate is too fast
+   * compared to the given bandwidth.
+   *
+   * @param numOfBytes
+   *     number of bytes sent/received since last time throttle was called
+   */
+  public synchronized void throttle(long numOfBytes) {
+    if ( numOfBytes <= 0 ) {
+      return;
+    }
+
+    curReserve -= numOfBytes;
+    bytesAlreadyUsed += numOfBytes;
+
+    while (curReserve <= 0) {
+      long now = System.currentTimeMillis();
+      long curPeriodEnd = curPeriodStart + period;
+
+      if ( now < curPeriodEnd ) {
+        // Wait for next period so that curReserve can be increased.
+        try {
+          wait( curPeriodEnd - now );
+        } catch (InterruptedException ignored) {}
+      } else if ( now <  (curPeriodStart + periodExtension)) {
+        curPeriodStart = curPeriodEnd;
+        curReserve += bytesPerPeriod;
+      } else {
+        // discard the prev period. Throttler might not have
+        // been used for a long time.
+        curPeriodStart = now;
+        curReserve = bytesPerPeriod - bytesAlreadyUsed;
+      }
+    }
+
+    bytesAlreadyUsed -= numOfBytes;
+  }
+}



Mime
View raw message