hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1595946 - in /hbase/branches/0.98: hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/
Date Mon, 19 May 2014 16:16:33 GMT
Author: tedyu
Date: Mon May 19 16:16:33 2014
New Revision: 1595946

URL: http://svn.apache.org/r1595946
Log:
HBASE-11090 Backport HBASE-11083 ExportSnapshot should provide capability to limit bandwidth
consumption


Added:
    hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/
    hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java

Added: hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java?rev=1595946&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
(added)
+++ hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
Mon May 19 16:16:33 2014
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hadoopbackport;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The ThrottleInputStream provides bandwidth throttling on a specified
+ * InputStream. It is implemented as a wrapper on top of another InputStream
+ * instance.
+ * The throttling works by examining the number of bytes read from the underlying
+ * InputStream from the beginning, and sleep()ing for a time interval if
+ * the byte-transfer is found exceed the specified tolerable maximum.
+ * (Thus, while the read-rate might exceed the maximum for a given short interval,
+ * the average tends towards the specified maximum, overall.)
+ */
+public class ThrottledInputStream extends InputStream {
+
+  private final InputStream rawStream;
+  private final long maxBytesPerSec;
+  private final long startTime = System.currentTimeMillis();
+
+  private long bytesRead = 0;
+  private long totalSleepTime = 0;
+
+  private static final long SLEEP_DURATION_MS = 50;
+
+  public ThrottledInputStream(InputStream rawStream) {
+    this(rawStream, Long.MAX_VALUE);
+  }
+
+  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
+    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
+    this.rawStream = rawStream;
+    this.maxBytesPerSec = maxBytesPerSec;
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawStream.close();
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read() throws IOException {
+    throttle();
+    int data = rawStream.read();
+    if (data != -1) {
+      bytesRead++;
+    }
+    return data;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b, off, len);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  private void throttle() throws IOException {
+    if (getBytesPerSec() > maxBytesPerSec) {
+      try {
+        Thread.sleep(SLEEP_DURATION_MS);
+        totalSleepTime += SLEEP_DURATION_MS;
+      } catch (InterruptedException e) {
+        throw new IOException("Thread aborted", e);
+      }
+    }
+  }
+
+  /**
+   * Getter for the number of bytes read from this stream, since creation.
+   * @return The number of bytes.
+   */
+  public long getTotalBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Getter for the read-rate from this stream, since creation.
+   * Calculated as bytesRead/elapsedTimeSinceStart.
+   * @return Read rate, in bytes/sec.
+   */
+  public long getBytesPerSec() {
+    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
+    if (elapsed == 0) {
+      return bytesRead;
+    } else {
+      return bytesRead / elapsed;
+    }
+  }
+
+  /**
+   * Getter the total time spent in sleep.
+   * @return Number of milliseconds spent in sleep.
+   */
+  public long getTotalSleepTime() {
+    return totalSleepTime;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public String toString() {
+    return "ThrottledInputStream{" +
+        "bytesRead=" + bytesRead +
+        ", maxBytesPerSec=" + maxBytesPerSec +
+        ", bytesPerSec=" + getBytesPerSec() +
+        ", totalSleepTime=" + totalSleepTime +
+        '}';
+  }
+}

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java?rev=1595946&r1=1595945&r2=1595946&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
(original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
Mon May 19 16:16:33 2014
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hbase.snapshot;
 
+import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HLogLink;
+import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
 import org.apache.hadoop.hbase.mapreduce.JobUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
@@ -84,6 +87,7 @@ public final class ExportSnapshot extend
   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
+  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
@@ -226,7 +230,11 @@ public final class ExportSnapshot extend
         }
       }
 
-      FSDataInputStream in = openSourceFile(context, inputPath);
+      InputStream in = openSourceFile(context, inputPath);
+      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
+      if (Integer.MAX_VALUE != bandwidthMB) {
+        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
+      }
       try {
         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
 
@@ -299,7 +307,7 @@ public final class ExportSnapshot extend
     }
 
     private void copyData(final Context context,
-        final Path inputPath, final FSDataInputStream in,
+        final Path inputPath, final InputStream in,
         final Path outputPath, final FSDataOutputStream out,
         final long inputFileSize)
         throws IOException {
@@ -586,7 +594,8 @@ public final class ExportSnapshot extend
   private void runCopyJob(final Path inputRoot, final Path outputRoot,
       final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
       final String filesUser, final String filesGroup, final int filesMode,
-      final int mappers) throws IOException, InterruptedException, ClassNotFoundException
{
+      final int mappers, final int bandwidthMB)
+          throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = getConf();
     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
@@ -595,6 +604,7 @@ public final class ExportSnapshot extend
     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
     conf.setInt("mapreduce.job.maps", mappers);
+    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
 
     Job job = new Job(conf);
     job.setJobName("ExportSnapshot");
@@ -658,6 +668,7 @@ public final class ExportSnapshot extend
     String filesGroup = null;
     String filesUser = null;
     Path outputRoot = null;
+    int bandwidthMB = Integer.MAX_VALUE;
     int filesMode = 0;
     int mappers = 0;
 
@@ -688,6 +699,8 @@ public final class ExportSnapshot extend
           filesUser = args[++i];
         } else if (cmd.equals("-chgroup")) {
           filesGroup = args[++i];
+        } else if (cmd.equals("-bandwidth")) {
+          bandwidthMB = Integer.parseInt(args[++i]);
         } else if (cmd.equals("-chmod")) {
           filesMode = Integer.parseInt(args[++i], 8);
         } else if (cmd.equals("-overwrite")) {
@@ -799,7 +812,7 @@ public final class ExportSnapshot extend
         LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
       } else {
         runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
-                   filesUser, filesGroup, filesMode, mappers);
+                   filesUser, filesGroup, filesMode, mappers, bandwidthMB);
       }
 
 



Mime
View raw message