1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hadoopbackport;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23
24
25
26
27
28
29
30
31
32
33
34 public class ThrottledInputStream extends InputStream {
35
36 private final InputStream rawStream;
37 private final long maxBytesPerSec;
38 private final long startTime = System.currentTimeMillis();
39
40 private long bytesRead = 0;
41 private long totalSleepTime = 0;
42
43 private static final long SLEEP_DURATION_MS = 50;
44
45 public ThrottledInputStream(InputStream rawStream) {
46 this(rawStream, Long.MAX_VALUE);
47 }
48
49 public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
50 assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
51 this.rawStream = rawStream;
52 this.maxBytesPerSec = maxBytesPerSec;
53 }
54
55 @Override
56 public void close() throws IOException {
57 rawStream.close();
58 }
59
60
61 @Override
62 public int read() throws IOException {
63 throttle();
64 int data = rawStream.read();
65 if (data != -1) {
66 bytesRead++;
67 }
68 return data;
69 }
70
71
72 @Override
73 public int read(byte[] b) throws IOException {
74 throttle();
75 int readLen = rawStream.read(b);
76 if (readLen != -1) {
77 bytesRead += readLen;
78 }
79 return readLen;
80 }
81
82
83 @Override
84 public int read(byte[] b, int off, int len) throws IOException {
85 throttle();
86 int readLen = rawStream.read(b, off, len);
87 if (readLen != -1) {
88 bytesRead += readLen;
89 }
90 return readLen;
91 }
92
93 private void throttle() throws IOException {
94 if (getBytesPerSec() > maxBytesPerSec) {
95 try {
96 Thread.sleep(SLEEP_DURATION_MS);
97 totalSleepTime += SLEEP_DURATION_MS;
98 } catch (InterruptedException e) {
99 throw new IOException("Thread aborted", e);
100 }
101 }
102 }
103
104
105
106
107
108 public long getTotalBytesRead() {
109 return bytesRead;
110 }
111
112
113
114
115
116
117 public long getBytesPerSec() {
118 long elapsed = (System.currentTimeMillis() - startTime) / 1000;
119 if (elapsed == 0) {
120 return bytesRead;
121 } else {
122 return bytesRead / elapsed;
123 }
124 }
125
126
127
128
129
130 public long getTotalSleepTime() {
131 return totalSleepTime;
132 }
133
134
135 @Override
136 public String toString() {
137 return "ThrottledInputStream{" +
138 "bytesRead=" + bytesRead +
139 ", maxBytesPerSec=" + maxBytesPerSec +
140 ", bytesPerSec=" + getBytesPerSec() +
141 ", totalSleepTime=" + totalSleepTime +
142 '}';
143 }
144 }