View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io.hadoopbackport;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  
24  /**
25   * The ThrottleInputStream provides bandwidth throttling on a specified
26   * InputStream. It is implemented as a wrapper on top of another InputStream
27   * instance.
28   * The throttling works by examining the number of bytes read from the underlying
29   * InputStream from the beginning, and sleep()ing for a time interval if
30   * the byte-transfer is found exceed the specified tolerable maximum.
31   * (Thus, while the read-rate might exceed the maximum for a given short interval,
32   * the average tends towards the specified maximum, overall.)
33   */
34  public class ThrottledInputStream extends InputStream {
35  
36    private final InputStream rawStream;
37    private final long maxBytesPerSec;
38    private final long startTime = System.currentTimeMillis();
39  
40    private long bytesRead = 0;
41    private long totalSleepTime = 0;
42  
43    private static final long SLEEP_DURATION_MS = 50;
44  
45    public ThrottledInputStream(InputStream rawStream) {
46      this(rawStream, Long.MAX_VALUE);
47    }
48  
49    public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
50      assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
51      this.rawStream = rawStream;
52      this.maxBytesPerSec = maxBytesPerSec;
53    }
54  
55    @Override
56    public void close() throws IOException {
57      rawStream.close();
58    }
59  
60    /** @inheritDoc */
61    @Override
62    public int read() throws IOException {
63      throttle();
64      int data = rawStream.read();
65      if (data != -1) {
66        bytesRead++;
67      }
68      return data;
69    }
70  
71    /** @inheritDoc */
72    @Override
73    public int read(byte[] b) throws IOException {
74      throttle();
75      int readLen = rawStream.read(b);
76      if (readLen != -1) {
77        bytesRead += readLen;
78      }
79      return readLen;
80    }
81  
82    /** @inheritDoc */
83    @Override
84    public int read(byte[] b, int off, int len) throws IOException {
85      throttle();
86      int readLen = rawStream.read(b, off, len);
87      if (readLen != -1) {
88        bytesRead += readLen;
89      }
90      return readLen;
91    }
92  
93    private void throttle() throws IOException {
94      if (getBytesPerSec() > maxBytesPerSec) {
95        try {
96          Thread.sleep(SLEEP_DURATION_MS);
97          totalSleepTime += SLEEP_DURATION_MS;
98        } catch (InterruptedException e) {
99          throw new IOException("Thread aborted", e);
100       }
101     }
102   }
103 
104   /**
105    * Getter for the number of bytes read from this stream, since creation.
106    * @return The number of bytes.
107    */
108   public long getTotalBytesRead() {
109     return bytesRead;
110   }
111 
112   /**
113    * Getter for the read-rate from this stream, since creation.
114    * Calculated as bytesRead/elapsedTimeSinceStart.
115    * @return Read rate, in bytes/sec.
116    */
117   public long getBytesPerSec() {
118     long elapsed = (System.currentTimeMillis() - startTime) / 1000;
119     if (elapsed == 0) {
120       return bytesRead;
121     } else {
122       return bytesRead / elapsed;
123     }
124   }
125 
126   /**
127    * Getter the total time spent in sleep.
128    * @return Number of milliseconds spent in sleep.
129    */
130   public long getTotalSleepTime() {
131     return totalSleepTime;
132   }
133 
134   /** @inheritDoc */
135   @Override
136   public String toString() {
137     return "ThrottledInputStream{" +
138         "bytesRead=" + bytesRead +
139         ", maxBytesPerSec=" + maxBytesPerSec +
140         ", bytesPerSec=" + getBytesPerSec() +
141         ", totalSleepTime=" + totalSleepTime +
142         '}';
143   }
144 }