1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Callable;
28
29 import org.apache.commons.lang.exception.ExceptionUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.commons.logging.impl.Log4JLogger;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.CoordinatedStateManager;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.HTestConst;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.KeyValue.KVComparator;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.ResultScanner;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.client.ScannerCallable;
52 import org.apache.hadoop.hbase.client.Table;
53 import org.apache.hadoop.hbase.filter.Filter;
54 import org.apache.hadoop.hbase.filter.FilterBase;
55 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
56 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
57 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
58 import org.apache.hadoop.hbase.testclassification.MediumTests;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.wal.WAL;
61 import org.apache.log4j.Level;
62 import org.junit.After;
63 import org.junit.AfterClass;
64 import org.junit.Before;
65 import org.junit.BeforeClass;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68
69 import com.google.protobuf.RpcController;
70 import com.google.protobuf.ServiceException;
71
72
73
74
75
76
77
78
79
80 @Category(MediumTests.class)
81 public class TestScannerHeartbeatMessages {
82 private static final Log LOG = LogFactory.getLog(TestScannerHeartbeatMessages.class);
83
84 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
85
86 private static Table TABLE = null;
87
88
89
90
91 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
92
93 private static int NUM_ROWS = 5;
94 private static byte[] ROW = Bytes.toBytes("testRow");
95 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
96
97 private static int NUM_FAMILIES = 3;
98 private static byte[] FAMILY = Bytes.toBytes("testFamily");
99 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
100
101 private static int NUM_QUALIFIERS = 3;
102 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
103 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
104
105 private static int VALUE_SIZE = 128;
106 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
107
108
109
110
111 private static int CLIENT_TIMEOUT = 2000;
112
113
114 private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
115
116
117 private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
118
119
120 private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES;
121
122 @BeforeClass
123 public static void setUpBeforeClass() throws Exception {
124 ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
125 ((Log4JLogger) HeartbeatRPCServices.LOG).getLogger().setLevel(Level.ALL);
126 Configuration conf = TEST_UTIL.getConfiguration();
127
128 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
129 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
130 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
131 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
132 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
133
134
135 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
136 TEST_UTIL.startMiniCluster(1);
137
138 TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
139 }
140
141 static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
142 byte[][] qualifiers, byte[] cellValue) throws IOException {
143 Table ht = TEST_UTIL.createTable(name, families);
144 List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
145 ht.put(puts);
146
147 return ht;
148 }
149
150
151
152
153
154
155
156
157
158
159 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
160 byte[] value) throws IOException {
161 Put put;
162 ArrayList<Put> puts = new ArrayList<>();
163
164 for (int row = 0; row < rows.length; row++) {
165 put = new Put(rows[row]);
166 for (int fam = 0; fam < families.length; fam++) {
167 for (int qual = 0; qual < qualifiers.length; qual++) {
168 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
169 put.add(kv);
170 }
171 }
172 puts.add(put);
173 }
174
175 return puts;
176 }
177
178 @AfterClass
179 public static void tearDownAfterClass() throws Exception {
180 TEST_UTIL.deleteTable(TABLE_NAME);
181 TEST_UTIL.shutdownMiniCluster();
182 }
183
184 @Before
185 public void setupBeforeTest() throws Exception {
186 disableSleeping();
187 }
188
189 @After
190 public void teardownAfterTest() throws Exception {
191 disableSleeping();
192 }
193
194
195
196
197
198
199
200 @Test
201 public void testScannerHeartbeatMessages() throws Exception {
202 testImportanceOfHeartbeats(testHeartbeatBetweenRows());
203 testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
204 testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
205 }
206
207
208
209
210
211
212
213
214 public void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
215 HeartbeatRPCServices.heartbeatsEnabled = true;
216
217 try {
218 testCallable.call();
219 } catch (Exception e) {
220 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
221 + ExceptionUtils.getStackTrace(e));
222 }
223
224 HeartbeatRPCServices.heartbeatsEnabled = false;
225 try {
226 testCallable.call();
227 } catch (Exception e) {
228 return;
229 } finally {
230 HeartbeatRPCServices.heartbeatsEnabled = true;
231 }
232 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
233 + " is not thrown, the test case is not testing the importance of heartbeat messages");
234 }
235
236
237
238
239
240
241 public Callable<Void> testHeartbeatBetweenRows() throws Exception {
242 return new Callable<Void>() {
243
244 @Override
245 public Void call() throws Exception {
246
247
248 Scan scan = new Scan();
249 scan.setMaxResultSize(Long.MAX_VALUE);
250 scan.setCaching(Integer.MAX_VALUE);
251
252 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
253 return null;
254 }
255 };
256 }
257
258
259
260
261
262 public Callable<Void> testHeartbeatBetweenColumnFamilies() throws Exception {
263 return new Callable<Void>() {
264 @Override
265 public Void call() throws Exception {
266
267
268 Scan baseScan = new Scan();
269 baseScan.setMaxResultSize(Long.MAX_VALUE);
270 baseScan.setCaching(Integer.MAX_VALUE);
271
272
273
274 Scan scanCopy = new Scan(baseScan);
275 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
276 scanCopy = new Scan(baseScan);
277 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
278 return null;
279 }
280 };
281 }
282
283 public static class SparseFilter extends FilterBase{
284
285 @Override
286 public ReturnCode filterKeyValue(Cell v) throws IOException {
287 try {
288 Thread.sleep(SERVER_TIME_LIMIT + 10);
289 } catch (InterruptedException e) {
290 Thread.currentThread().interrupt();
291 }
292 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
293 ReturnCode.INCLUDE :
294 ReturnCode.SKIP;
295 }
296
297 public static Filter parseFrom(final byte [] pbBytes){
298 return new SparseFilter();
299 }
300 }
301
302
303
304
305
306 public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
307 return new Callable<Void>() {
308 @Override
309 public Void call() throws Exception {
310 Scan scan = new Scan();
311 scan.setMaxResultSize(Long.MAX_VALUE);
312 scan.setCaching(Integer.MAX_VALUE);
313 scan.setFilter(new SparseFilter());
314 ResultScanner scanner = TABLE.getScanner(scan);
315 int num = 0;
316 while (scanner.next() != null) {
317 num++;
318 }
319 assertEquals(1, num);
320 scanner.close();
321
322 scan = new Scan();
323 scan.setMaxResultSize(Long.MAX_VALUE);
324 scan.setCaching(Integer.MAX_VALUE);
325 scan.setFilter(new SparseFilter());
326 scan.setAllowPartialResults(true);
327 scanner = TABLE.getScanner(scan);
328 num = 0;
329 while (scanner.next() != null) {
330 num++;
331 }
332 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
333 scanner.close();
334
335 return null;
336 }
337 };
338 }
339
340
341
342
343
344
345
346
347
348
349
350 public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
351 int cfSleepTime, boolean sleepBeforeCf) throws Exception {
352 disableSleeping();
353 final ResultScanner scanner = TABLE.getScanner(scan);
354 final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
355
356 Result r1 = null;
357 Result r2 = null;
358
359 while ((r1 = scanner.next()) != null) {
360
361 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
362 r2 = scannerWithHeartbeats.next();
363 disableSleeping();
364
365 assertTrue(r2 != null);
366 try {
367 Result.compareResults(r1, r2);
368 } catch (Exception e) {
369 fail(e.getMessage());
370 }
371 }
372
373 assertTrue(scannerWithHeartbeats.next() == null);
374 scanner.close();
375 scannerWithHeartbeats.close();
376 }
377
378
379
380
381
382
383
384 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
385 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
386 HeartbeatHRegion.rowSleepTime = rowSleepTime;
387
388 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
389 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
390 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
391 }
392
393
394
395
396 private static void disableSleeping() {
397 HeartbeatHRegion.sleepBetweenRows = false;
398 HeartbeatHRegion.sleepBetweenColumnFamilies = false;
399 }
400
401
402
403
404
405 private static class HeartbeatHRegionServer extends HRegionServer {
406 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
407 super(conf);
408 }
409
410 public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm)
411 throws IOException, InterruptedException {
412 super(conf, csm);
413 }
414
415 @Override
416 protected RSRpcServices createRpcServices() throws IOException {
417 return new HeartbeatRPCServices(this);
418 }
419 }
420
421
422
423
424 private static class HeartbeatRPCServices extends RSRpcServices {
425 private static boolean heartbeatsEnabled = true;
426
427 public HeartbeatRPCServices(HRegionServer rs) throws IOException {
428 super(rs);
429 }
430
431 @Override
432 public ScanResponse scan(RpcController controller, ScanRequest request)
433 throws ServiceException {
434 ScanRequest.Builder builder = ScanRequest.newBuilder(request);
435 builder.setClientHandlesHeartbeats(heartbeatsEnabled);
436 return super.scan(controller, builder.build());
437 }
438 }
439
440
441
442
443
444
445 private static class HeartbeatHRegion extends HRegion {
446
447 private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
448 private static boolean sleepBetweenRows = false;
449
450
451
452
453
454
455 private static boolean sleepBeforeColumnFamily = false;
456 private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
457 private static boolean sleepBetweenColumnFamilies = false;
458
459 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
460 HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) {
461 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
462 }
463
464 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
465 HTableDescriptor htd, RegionServerServices rsServices) {
466 super(fs, wal, confParam, htd, rsServices);
467 }
468
469 private static void columnFamilySleep() {
470 if (HeartbeatHRegion.sleepBetweenColumnFamilies) {
471 try {
472 Thread.sleep(HeartbeatHRegion.columnFamilySleepTime);
473 } catch (InterruptedException e) {
474 }
475 }
476 }
477
478 private static void rowSleep() {
479 try {
480 if (HeartbeatHRegion.sleepBetweenRows) {
481 Thread.sleep(HeartbeatHRegion.rowSleepTime);
482 }
483 } catch (InterruptedException e) {
484 }
485 }
486
487
488 @Override
489 protected RegionScanner instantiateRegionScanner(Scan scan,
490 List<KeyValueScanner> additionalScanners) throws IOException {
491 if (scan.isReversed()) {
492 if (scan.getFilter() != null) {
493 scan.getFilter().setReversed(true);
494 }
495 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
496 }
497 return new HeartbeatRegionScanner(scan, additionalScanners, this);
498 }
499 }
500
501
502
503
504
505 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
506 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
507 HRegion region) throws IOException {
508 super(scan, additionalScanners, region);
509 }
510
511 @Override
512 public boolean nextRaw(List<Cell> outResults, ScannerContext context)
513 throws IOException {
514 boolean moreRows = super.nextRaw(outResults, context);
515 HeartbeatHRegion.rowSleep();
516 return moreRows;
517 }
518
519 @Override
520 protected void initializeKVHeap(List<KeyValueScanner> scanners,
521 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
522 this.storeHeap = new HeartbeatReversedKVHeap(scanners, region.getComparator());
523 if (!joinedScanners.isEmpty()) {
524 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, region.getComparator());
525 }
526 }
527 }
528
529
530
531
532
533 private static class HeartbeatRegionScanner extends RegionScannerImpl {
534 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
535 throws IOException {
536 region.super(scan, additionalScanners, region);
537 }
538
539 @Override
540 public boolean nextRaw(List<Cell> outResults, ScannerContext context)
541 throws IOException {
542 boolean moreRows = super.nextRaw(outResults, context);
543 HeartbeatHRegion.rowSleep();
544 return moreRows;
545 }
546
547 @Override
548 protected void initializeKVHeap(List<KeyValueScanner> scanners,
549 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
550 this.storeHeap = new HeartbeatKVHeap(scanners, region.getComparator());
551 if (!joinedScanners.isEmpty()) {
552 this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getComparator());
553 }
554 }
555 }
556
557
558
559
560
561 private static final class HeartbeatKVHeap extends KeyValueHeap {
562 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator)
563 throws IOException {
564 super(scanners, comparator);
565 }
566
567 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
568 throws IOException {
569 super(scanners, comparator);
570 }
571
572 @Override
573 public boolean next(List<Cell> result, ScannerContext context)
574 throws IOException {
575 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
576 boolean moreRows = super.next(result, context);
577 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
578 return moreRows;
579 }
580 }
581
582
583
584
585
586 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
587 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
588 KVComparator comparator) throws IOException {
589 super(scanners, comparator);
590 }
591
592 @Override
593 public boolean next(List<Cell> result, ScannerContext context)
594 throws IOException {
595 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
596 boolean moreRows = super.next(result, context);
597 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
598 return moreRows;
599 }
600 }
601 }