1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.lang.reflect.Field;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ThreadPoolExecutor;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.commons.logging.impl.Log4JLogger;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.MediumTests;
40 import org.apache.hadoop.hbase.Waiter;
41 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
42 import org.apache.hadoop.hbase.ipc.RpcClient;
43 import org.apache.hadoop.hbase.ipc.RpcServer;
44 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.JVMClusterUtil;
47 import org.apache.hadoop.hbase.util.Threads;
48 import org.apache.log4j.Level;
49 import org.junit.AfterClass;
50 import org.junit.Assert;
51 import org.junit.Before;
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 @Category(MediumTests.class)
57 public class TestMultiParallel {
58 private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
59 {
60 ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
61 ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
62 }
63 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
64 private static final byte[] VALUE = Bytes.toBytes("value");
65 private static final byte[] QUALIFIER = Bytes.toBytes("qual");
66 private static final String FAMILY = "family";
67 private static final String TEST_TABLE = "multi_test_table";
68 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
69 private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
70 private static final byte [][] KEYS = makeKeys();
71
72 private static final int slaves = 5;
73
74 @BeforeClass public static void beforeClass() throws Exception {
75 ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
76 ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
77 ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
78 UTIL.startMiniCluster(slaves);
79 HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
80 UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
81 UTIL.waitTableEnabled(Bytes.toBytes(TEST_TABLE));
82 t.close();
83 }
84
85 @AfterClass public static void afterClass() throws Exception {
86 UTIL.shutdownMiniCluster();
87 }
88
89 @Before public void before() throws Exception {
90 LOG.info("before");
91 if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
92
93 UTIL.getMiniHBaseCluster().getMaster().balance();
94
95
96 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
97 }
98 HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration());
99 conn.clearRegionCache();
100 conn.close();
101 LOG.info("before done");
102 }
103
104 private static byte[][] makeKeys() {
105 byte [][] starterKeys = HBaseTestingUtility.KEYS;
106
107
108
109
110
111 int numKeys = (int) ((float) starterKeys.length * 10.33F);
112
113 List<byte[]> keys = new ArrayList<byte[]>();
114 for (int i = 0; i < numKeys; i++) {
115 int kIdx = i % starterKeys.length;
116 byte[] k = starterKeys[kIdx];
117 byte[] cp = new byte[k.length + 1];
118 System.arraycopy(k, 0, cp, 0, k.length);
119 cp[k.length] = new Integer(i % 256).byteValue();
120 keys.add(cp);
121 }
122
123
124
125
126
127 for (int i = 0; i < 100; i++) {
128 int kIdx = i % starterKeys.length;
129 byte[] k = starterKeys[kIdx];
130 byte[] cp = new byte[k.length + 1];
131 System.arraycopy(k, 0, cp, 0, k.length);
132 cp[k.length] = new Integer(i % 256).byteValue();
133 keys.add(cp);
134 }
135 return keys.toArray(new byte [][] {new byte [] {}});
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149 @Test(timeout=300000)
150 public void testActiveThreadsCount() throws Exception{
151 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
152 List<Row> puts = constructPutRequests();
153 table.batch(puts);
154 Field poolField = table.getClass().getDeclaredField("pool");
155 poolField.setAccessible(true);
156 ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table);
157 assertEquals(slaves, tExecutor.getLargestPoolSize());
158 table.close();
159 }
160
161 @Test(timeout=300000)
162 public void testBatchWithGet() throws Exception {
163 LOG.info("test=testBatchWithGet");
164 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
165
166
167 List<Row> puts = constructPutRequests();
168 table.batch(puts);
169
170
171 List<Row> gets = new ArrayList<Row>();
172 for (byte[] k : KEYS) {
173 Get get = new Get(k);
174 get.addColumn(BYTES_FAMILY, QUALIFIER);
175 gets.add(get);
176 }
177 Result[] multiRes = new Result[gets.size()];
178 table.batch(gets, multiRes);
179
180
181 List<Result> singleRes = new ArrayList<Result>();
182 for (Row get : gets) {
183 singleRes.add(table.get((Get) get));
184 }
185
186 Assert.assertEquals(singleRes.size(), multiRes.length);
187 for (int i = 0; i < singleRes.size(); i++) {
188 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
189 Cell[] singleKvs = singleRes.get(i).rawCells();
190 Cell[] multiKvs = multiRes[i].rawCells();
191 for (int j = 0; j < singleKvs.length; j++) {
192 Assert.assertEquals(singleKvs[j], multiKvs[j]);
193 Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
194 CellUtil.cloneValue(multiKvs[j])));
195 }
196 }
197 table.close();
198 }
199
200 @Test
201 public void testBadFam() throws Exception {
202 LOG.info("test=testBadFam");
203 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
204
205 List<Row> actions = new ArrayList<Row>();
206 Put p = new Put(Bytes.toBytes("row1"));
207 p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
208 actions.add(p);
209 p = new Put(Bytes.toBytes("row2"));
210 p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
211 actions.add(p);
212
213
214
215 Object [] r = new Object[actions.size()];
216 try {
217 table.batch(actions, r);
218 fail();
219 } catch (RetriesExhaustedWithDetailsException ex) {
220 LOG.debug(ex);
221
222 assertFalse(ex.mayHaveClusterIssues());
223 }
224 assertEquals(2, r.length);
225 assertTrue(r[0] instanceof Throwable);
226 assertTrue(r[1] instanceof Result);
227 table.close();
228 }
229
230 @Test (timeout=300000)
231 public void testFlushCommitsNoAbort() throws Exception {
232 LOG.info("test=testFlushCommitsNoAbort");
233 doTestFlushCommits(false);
234 }
235
236
237
238
239
240
241
242 @Test (timeout=300000)
243 public void testFlushCommitsWithAbort() throws Exception {
244 LOG.info("test=testFlushCommitsWithAbort");
245 doTestFlushCommits(true);
246 }
247
248
249
250
251
252
253 private void doTestFlushCommits(boolean doAbort) throws Exception {
254
255 LOG.info("get new table");
256 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
257 table.setAutoFlush(false, true);
258 table.setWriteBufferSize(10 * 1024 * 1024);
259
260 LOG.info("constructPutRequests");
261 List<Row> puts = constructPutRequests();
262 for (Row put : puts) {
263 table.put((Put) put);
264 }
265 LOG.info("puts");
266 table.flushCommits();
267 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
268 .size();
269 assert liveRScount > 0;
270 JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
271 .getLiveRegionServerThreads().get(0);
272 if (doAbort) {
273 liveRS.getRegionServer().abort("Aborting for tests",
274 new Exception("doTestFlushCommits"));
275
276
277
278
279 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
280 Thread.sleep(10);
281 }
282
283
284 puts = constructPutRequests();
285 for (Row put : puts) {
286 table.put((Put) put);
287 }
288
289 table.flushCommits();
290 }
291
292 LOG.info("validating loaded data");
293 validateLoadedData(table);
294
295
296 List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
297 int count = 0;
298 for (JVMClusterUtil.RegionServerThread t: liveRSs) {
299 count++;
300 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
301 }
302 LOG.info("Count=" + count);
303 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
304 (doAbort ? (liveRScount - 1) : liveRScount), count);
305 for (JVMClusterUtil.RegionServerThread t: liveRSs) {
306 int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
307
308 }
309 if (doAbort) {
310 UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
311 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
312 @Override
313 public boolean evaluate() throws Exception {
314 return UTIL.getMiniHBaseCluster().getMaster()
315 .getClusterStatus().getServersSize() == (liveRScount - 1);
316 }
317 });
318 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
319 }
320
321 table.close();
322 LOG.info("done");
323 }
324
325 @Test (timeout=300000)
326 public void testBatchWithPut() throws Exception {
327 LOG.info("test=testBatchWithPut");
328 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
329
330
331 List<Row> puts = constructPutRequests();
332
333 Object[] results = table.batch(puts);
334 validateSizeAndEmpty(results, KEYS.length);
335
336 if (true) {
337 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
338 assert liveRScount > 0;
339 JVMClusterUtil.RegionServerThread liveRS =
340 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
341 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
342 puts = constructPutRequests();
343 try {
344 results = table.batch(puts);
345 } catch (RetriesExhaustedWithDetailsException ree) {
346 LOG.info(ree.getExhaustiveDescription());
347 throw ree;
348 } finally {
349 table.close();
350 }
351 validateSizeAndEmpty(results, KEYS.length);
352 }
353
354 validateLoadedData(table);
355 table.close();
356 }
357
358 @Test(timeout=300000)
359 public void testBatchWithDelete() throws Exception {
360 LOG.info("test=testBatchWithDelete");
361 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
362
363
364 List<Row> puts = constructPutRequests();
365 Object[] results = table.batch(puts);
366 validateSizeAndEmpty(results, KEYS.length);
367
368
369 List<Row> deletes = new ArrayList<Row>();
370 for (int i = 0; i < KEYS.length; i++) {
371 Delete delete = new Delete(KEYS[i]);
372 delete.deleteFamily(BYTES_FAMILY);
373 deletes.add(delete);
374 }
375 results = table.batch(deletes);
376 validateSizeAndEmpty(results, KEYS.length);
377
378
379 for (byte[] k : KEYS) {
380 Get get = new Get(k);
381 get.addColumn(BYTES_FAMILY, QUALIFIER);
382 Assert.assertFalse(table.exists(get));
383 }
384 table.close();
385 }
386
387 @Test(timeout=300000)
388 public void testHTableDeleteWithList() throws Exception {
389 LOG.info("test=testHTableDeleteWithList");
390 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
391
392
393 List<Row> puts = constructPutRequests();
394 Object[] results = table.batch(puts);
395 validateSizeAndEmpty(results, KEYS.length);
396
397
398 ArrayList<Delete> deletes = new ArrayList<Delete>();
399 for (int i = 0; i < KEYS.length; i++) {
400 Delete delete = new Delete(KEYS[i]);
401 delete.deleteFamily(BYTES_FAMILY);
402 deletes.add(delete);
403 }
404 table.delete(deletes);
405 Assert.assertTrue(deletes.isEmpty());
406
407
408 for (byte[] k : KEYS) {
409 Get get = new Get(k);
410 get.addColumn(BYTES_FAMILY, QUALIFIER);
411 Assert.assertFalse(table.exists(get));
412 }
413 table.close();
414 }
415
416 @Test(timeout=300000)
417 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
418 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
419 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
420
421 List<Row> puts = new ArrayList<Row>();
422 for (int i = 0; i < 100; i++) {
423 Put put = new Put(ONE_ROW);
424 byte[] qual = Bytes.toBytes("column" + i);
425 put.add(BYTES_FAMILY, qual, VALUE);
426 puts.add(put);
427 }
428 Object[] results = table.batch(puts);
429
430
431 validateSizeAndEmpty(results, 100);
432
433
434 List<Row> gets = new ArrayList<Row>();
435 for (int i = 0; i < 100; i++) {
436 Get get = new Get(ONE_ROW);
437 byte[] qual = Bytes.toBytes("column" + i);
438 get.addColumn(BYTES_FAMILY, qual);
439 gets.add(get);
440 }
441
442 Object[] multiRes = table.batch(gets);
443
444 int idx = 0;
445 for (Object r : multiRes) {
446 byte[] qual = Bytes.toBytes("column" + idx);
447 validateResult(r, qual, VALUE);
448 idx++;
449 }
450 table.close();
451 }
452
453 @Test(timeout=300000)
454 public void testBatchWithIncrementAndAppend() throws Exception {
455 LOG.info("test=testBatchWithIncrementAndAppend");
456 final byte[] QUAL1 = Bytes.toBytes("qual1");
457 final byte[] QUAL2 = Bytes.toBytes("qual2");
458 final byte[] QUAL3 = Bytes.toBytes("qual3");
459 final byte[] QUAL4 = Bytes.toBytes("qual4");
460 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
461 Delete d = new Delete(ONE_ROW);
462 table.delete(d);
463 Put put = new Put(ONE_ROW);
464 put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
465 put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
466 table.put(put);
467
468 Increment inc = new Increment(ONE_ROW);
469 inc.addColumn(BYTES_FAMILY, QUAL2, 1);
470 inc.addColumn(BYTES_FAMILY, QUAL3, 1);
471
472 Append a = new Append(ONE_ROW);
473 a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
474 a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
475 List<Row> actions = new ArrayList<Row>();
476 actions.add(inc);
477 actions.add(a);
478
479 Object[] multiRes = table.batch(actions);
480 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
481 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
482 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
483 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
484 table.close();
485 }
486
487 @Test(timeout=300000)
488 public void testNonceCollision() throws Exception {
489 LOG.info("test=testNonceCollision");
490 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
491 Put put = new Put(ONE_ROW);
492 put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
493
494
495 NonceGenerator cnm = new PerClientRandomNonceGenerator() {
496 long lastNonce = -1;
497 @Override
498 public synchronized long newNonce() {
499 long nonce = 0;
500 if (lastNonce == -1) {
501 lastNonce = nonce = super.newNonce();
502 } else {
503 nonce = lastNonce;
504 lastNonce = -1L;
505 }
506 return nonce;
507 }
508 };
509 NonceGenerator oldCnm =
510 HConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), cnm);
511
512
513 try {
514 Increment inc = new Increment(ONE_ROW);
515 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
516 table.increment(inc);
517 inc = new Increment(ONE_ROW);
518 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
519 try {
520 table.increment(inc);
521 fail("Should have thrown an exception");
522 } catch (OperationConflictException ex) {
523 }
524 Get get = new Get(ONE_ROW);
525 get.addColumn(BYTES_FAMILY, QUALIFIER);
526 Result result = table.get(get);
527 validateResult(result, QUALIFIER, Bytes.toBytes(1L));
528
529
530 int numRequests = 40;
531 final CountDownLatch startedLatch = new CountDownLatch(numRequests);
532 final CountDownLatch startLatch = new CountDownLatch(1);
533 final CountDownLatch doneLatch = new CountDownLatch(numRequests);
534 for (int i = 0; i < numRequests; ++i) {
535 Runnable r = new Runnable() {
536 @Override
537 public void run() {
538 HTable table = null;
539 try {
540 table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
541 } catch (IOException e) {
542 fail("Not expected");
543 }
544 Increment inc = new Increment(ONE_ROW);
545 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
546 startedLatch.countDown();
547 try {
548 startLatch.await();
549 } catch (InterruptedException e) {
550 fail("Not expected");
551 }
552 try {
553 table.increment(inc);
554 } catch (OperationConflictException ex) {
555 } catch (IOException ioEx) {
556 fail("Not expected");
557 }
558 doneLatch.countDown();
559 }
560 };
561 Threads.setDaemonThreadRunning(new Thread(r));
562 }
563 startedLatch.await();
564 startLatch.countDown();
565 doneLatch.await();
566
567 get = new Get(ONE_ROW);
568 get.addColumn(BYTES_FAMILY, QUALIFIER);
569 result = table.get(get);
570 validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
571 table.close();
572 } finally {
573 HConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm);
574 }
575 }
576
577 @Test(timeout=300000)
578 public void testBatchWithMixedActions() throws Exception {
579 LOG.info("test=testBatchWithMixedActions");
580 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
581
582
583 Object[] results = table.batch(constructPutRequests());
584 validateSizeAndEmpty(results, KEYS.length);
585
586
587
588 List<Row> actions = new ArrayList<Row>();
589
590 byte[] qual2 = Bytes.toBytes("qual2");
591 byte[] val2 = Bytes.toBytes("putvalue2");
592
593
594 Get get = new Get(KEYS[10]);
595 get.addColumn(BYTES_FAMILY, QUALIFIER);
596 actions.add(get);
597
598
599 get = new Get(KEYS[11]);
600 get.addColumn(BYTES_FAMILY, QUALIFIER);
601 actions.add(get);
602
603
604 Put put = new Put(KEYS[10]);
605 put.add(BYTES_FAMILY, qual2, val2);
606 actions.add(put);
607
608
609 Delete delete = new Delete(KEYS[20]);
610 delete.deleteFamily(BYTES_FAMILY);
611 actions.add(delete);
612
613
614 get = new Get(KEYS[30]);
615 get.addColumn(BYTES_FAMILY, QUALIFIER);
616 actions.add(get);
617
618
619
620
621
622
623 put = new Put(KEYS[40]);
624 put.add(BYTES_FAMILY, qual2, val2);
625 actions.add(put);
626
627 results = table.batch(actions);
628
629
630
631 validateResult(results[0]);
632 validateResult(results[1]);
633 validateEmpty(results[2]);
634 validateEmpty(results[3]);
635 validateResult(results[4]);
636 validateEmpty(results[5]);
637
638
639 get = new Get(KEYS[40]);
640 get.addColumn(BYTES_FAMILY, qual2);
641 Result r = table.get(get);
642 validateResult(r, qual2, val2);
643
644 table.close();
645 }
646
647
648
649 private void validateResult(Object r) {
650 validateResult(r, QUALIFIER, VALUE);
651 }
652
653 private void validateResult(Object r1, byte[] qual, byte[] val) {
654 Result r = (Result)r1;
655 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
656 byte[] value = r.getValue(BYTES_FAMILY, qual);
657 if (0 != Bytes.compareTo(val, value)) {
658 fail("Expected [" + Bytes.toStringBinary(val)
659 + "] but got [" + Bytes.toStringBinary(value) + "]");
660 }
661 }
662
663 private List<Row> constructPutRequests() {
664 List<Row> puts = new ArrayList<Row>();
665 for (byte[] k : KEYS) {
666 Put put = new Put(k);
667 put.add(BYTES_FAMILY, QUALIFIER, VALUE);
668 puts.add(put);
669 }
670 return puts;
671 }
672
673 private void validateLoadedData(HTable table) throws IOException {
674
675 for (byte[] k : KEYS) {
676 Get get = new Get(k);
677 get.addColumn(BYTES_FAMILY, QUALIFIER);
678 Result r = table.get(get);
679 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
680 Assert.assertEquals(0, Bytes.compareTo(VALUE, r
681 .getValue(BYTES_FAMILY, QUALIFIER)));
682 }
683 }
684
685 private void validateEmpty(Object r1) {
686 Result result = (Result)r1;
687 Assert.assertTrue(result != null);
688 Assert.assertTrue(result.getRow() == null);
689 Assert.assertEquals(0, result.rawCells().length);
690 }
691
692 private void validateSizeAndEmpty(Object[] results, int expectedSize) {
693
694 Assert.assertEquals(expectedSize, results.length);
695 for (Object result : results) {
696 validateEmpty(result);
697 }
698 }
699 }