1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.IOException;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.concurrent.atomic.AtomicLong;
23
24 import org.apache.commons.lang.math.RandomUtils;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HRegionLocation;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
34
35
36 public class MultiThreadedReader extends MultiThreadedAction
37 {
38 private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
39
40 protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
41 private final double verifyPercent;
42 private volatile boolean aborted;
43
44 protected MultiThreadedWriterBase writer = null;
45
46
47
48
49
50
51 private final AtomicLong numUniqueKeysVerified = new AtomicLong();
52
53
54
55
56
57 public static final int DEFAULT_MAX_ERRORS = 10;
58
59
60
61
62
63
64
65 public static final int DEFAULT_KEY_WINDOW = 0;
66
67 protected AtomicLong numKeysVerified = new AtomicLong(0);
68 protected AtomicLong numReadErrors = new AtomicLong(0);
69 protected AtomicLong numReadFailures = new AtomicLong(0);
70 protected AtomicLong nullResult = new AtomicLong(0);
71
72 private int maxErrors = DEFAULT_MAX_ERRORS;
73 private int keyWindow = DEFAULT_KEY_WINDOW;
74
75 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
76 TableName tableName, double verifyPercent) {
77 super(dataGen, conf, tableName, "R");
78 this.verifyPercent = verifyPercent;
79 }
80
81 public void linkToWriter(MultiThreadedWriterBase writer) {
82 this.writer = writer;
83 writer.setTrackWroteKeys(true);
84 }
85
86 public void setMaxErrors(int maxErrors) {
87 this.maxErrors = maxErrors;
88 }
89
90 public void setKeyWindow(int keyWindow) {
91 this.keyWindow = keyWindow;
92 }
93
94 @Override
95 public void start(long startKey, long endKey, int numThreads) throws IOException {
96 super.start(startKey, endKey, numThreads);
97 if (verbose) {
98 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
99 }
100
101 addReaderThreads(numThreads);
102 startThreads(readers);
103 }
104
105 protected void addReaderThreads(int numThreads) throws IOException {
106 for (int i = 0; i < numThreads; ++i) {
107 HBaseReaderThread reader = new HBaseReaderThread(i);
108 readers.add(reader);
109 }
110 }
111
112 public class HBaseReaderThread extends Thread {
113 protected final int readerId;
114 protected final HTable table;
115
116
117 private long curKey;
118
119
120 protected long startTimeMs;
121
122
123 private boolean readingRandomKey;
124
125
126
127
128
129 public HBaseReaderThread(int readerId) throws IOException {
130 this.readerId = readerId;
131 table = createTable();
132 setName(getClass().getSimpleName() + "_" + readerId);
133 }
134
135 protected HTable createTable() throws IOException {
136 return new HTable(conf, tableName);
137 }
138
139 @Override
140 public void run() {
141 try {
142 runReader();
143 } finally {
144 closeTable();
145 numThreadsWorking.decrementAndGet();
146 }
147 }
148
149 protected void closeTable() {
150 try {
151 if (table != null) {
152 table.close();
153 }
154 } catch (IOException e) {
155 LOG.error("Error closing table", e);
156 }
157 }
158
159 private void runReader() {
160 if (verbose) {
161 LOG.info("Started thread #" + readerId + " for reads...");
162 }
163
164 startTimeMs = System.currentTimeMillis();
165 curKey = startKey;
166 while (curKey < endKey && !aborted) {
167 long k = getNextKeyToRead();
168
169
170 if (k < startKey || k >= endKey) {
171 numReadErrors.incrementAndGet();
172 throw new AssertionError("Load tester logic error: proposed key " +
173 "to read " + k + " is out of range (startKey=" + startKey +
174 ", endKey=" + endKey + ")");
175 }
176
177 if (k % numThreads != readerId ||
178 writer != null && writer.failedToWriteKey(k)) {
179
180
181 continue;
182 }
183
184 readKey(k);
185 if (k == curKey - 1 && !readingRandomKey) {
186
187 numUniqueKeysVerified.incrementAndGet();
188 }
189 }
190 }
191
192
193
194
195
196
197 private long maxKeyWeCanRead() {
198 long insertedUpToKey = writer.wroteUpToKey();
199 if (insertedUpToKey >= endKey - 1) {
200
201
202 return endKey - 1;
203 }
204 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
205 }
206
207 private long getNextKeyToRead() {
208 readingRandomKey = false;
209 if (writer == null || curKey <= maxKeyWeCanRead()) {
210 return curKey++;
211 }
212
213
214 long maxKeyToRead;
215 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
216
217
218
219 Threads.sleepWithoutInterrupt(50);
220 }
221
222 if (curKey <= maxKeyToRead) {
223
224
225 return curKey++;
226 }
227
228
229
230
231
232 readingRandomKey = true;
233 return startKey + Math.abs(RandomUtils.nextLong())
234 % (maxKeyToRead - startKey + 1);
235 }
236
237 private Get readKey(long keyToRead) {
238 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
239 String cfsString = "";
240 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
241 for (byte[] cf : columnFamilies) {
242 get.addFamily(cf);
243 if (verbose) {
244 if (cfsString.length() > 0) {
245 cfsString += ", ";
246 }
247 cfsString += "[" + Bytes.toStringBinary(cf) + "]";
248 }
249 }
250
251 try {
252 get = dataGenerator.beforeGet(keyToRead, get);
253 if (verbose) {
254 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
255 }
256 queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
257 } catch (IOException e) {
258 numReadFailures.addAndGet(1);
259 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
260 + ", time from start: "
261 + (System.currentTimeMillis() - startTimeMs) + " ms");
262 }
263 return get;
264 }
265
266 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
267 String rowKey = Bytes.toString(get.getRow());
268
269
270 long start = System.currentTimeMillis();
271 Result result = table.get(get);
272 getResultMetricUpdation(verify, rowKey, start, result, table, false);
273 }
274
275 protected void getResultMetricUpdation(boolean verify, String rowKey, long start,
276 Result result, HTable table, boolean isNullExpected)
277 throws IOException {
278 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
279 numKeys.addAndGet(1);
280 if (!result.isEmpty()) {
281 if (verify) {
282 numKeysVerified.incrementAndGet();
283 }
284 } else {
285 HRegionLocation hloc = table.getRegionLocation(
286 Bytes.toBytes(rowKey));
287 LOG.info("Key = " + rowKey + ", RegionServer: "
288 + hloc.getHostname());
289 if(isNullExpected) {
290 nullResult.incrementAndGet();
291 LOG.debug("Null result obtained for the key ="+rowKey);
292 return;
293 }
294 }
295 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
296 long numErrorsAfterThis = 0;
297 if (isOk) {
298 long cols = 0;
299
300 for (byte[] cf : result.getMap().keySet()) {
301 cols += result.getFamilyMap(cf).size();
302 }
303 numCols.addAndGet(cols);
304 } else {
305 if (writer != null) {
306 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
307 }
308 numErrorsAfterThis = numReadErrors.incrementAndGet();
309 }
310
311 if (numErrorsAfterThis > maxErrors) {
312 LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
313 aborted = true;
314 }
315 }
316 }
317
318 public long getNumReadFailures() {
319 return numReadFailures.get();
320 }
321
322 public long getNumReadErrors() {
323 return numReadErrors.get();
324 }
325
326 public long getNumKeysVerified() {
327 return numKeysVerified.get();
328 }
329
330 public long getNumUniqueKeysVerified() {
331 return numUniqueKeysVerified.get();
332 }
333
334 public long getNullResultsCount() {
335 return nullResult.get();
336 }
337
338 @Override
339 protected String progressInfo() {
340 StringBuilder sb = new StringBuilder();
341 appendToStatus(sb, "verified", numKeysVerified.get());
342 appendToStatus(sb, "READ FAILURES", numReadFailures.get());
343 appendToStatus(sb, "READ ERRORS", numReadErrors.get());
344 appendToStatus(sb, "NULL RESULT", nullResult.get());
345 return sb.toString();
346 }
347 }