1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.TableName;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.HRegionLocation;
28 import org.apache.hadoop.hbase.MediumTests;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.hbase.util.Threads;
33 import org.junit.Assert;
34 import org.junit.Test;
35 import org.junit.experimental.categories.Category;
36 import org.mockito.Mockito;
37
38 import java.io.IOException;
39 import java.io.InterruptedIOException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.SynchronousQueue;
46 import java.util.concurrent.ThreadFactory;
47 import java.util.concurrent.ThreadPoolExecutor;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import java.util.concurrent.atomic.AtomicInteger;
51
52 @Category(MediumTests.class)
53 public class TestAsyncProcess {
54 private static final TableName DUMMY_TABLE =
55 TableName.valueOf("DUMMY_TABLE");
56 private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
57 private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
58 private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
59 private static final byte[] FAILS = "FAILS".getBytes();
60 private static final Configuration conf = new Configuration();
61
62 private static ServerName sn = ServerName.valueOf("localhost:10,1254");
63 private static ServerName sn2 = ServerName.valueOf("localhost:140,12540");
64 private static HRegionInfo hri1 =
65 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
66 private static HRegionInfo hri2 =
67 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
68 private static HRegionInfo hri3 =
69 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
70 private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
71 private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
72 private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
73
74 private static final String success = "success";
75 private static Exception failure = new Exception("failure");
76
77 static class CountingThreadFactory implements ThreadFactory {
78 final AtomicInteger nbThreads;
79 ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
80 @Override
81 public Thread newThread(Runnable r) {
82 nbThreads.incrementAndGet();
83 return realFactory.newThread(r);
84 }
85
86 CountingThreadFactory(AtomicInteger nbThreads){
87 this.nbThreads = nbThreads;
88 }
89 }
90
91 static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
92 final AtomicInteger nbMultiResponse = new AtomicInteger();
93 final AtomicInteger nbActions = new AtomicInteger();
94
95 public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) {
96 this(hc, callback, conf, new AtomicInteger());
97 }
98
99 public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf,
100 AtomicInteger nbThreads) {
101 super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
102 new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
103 callback, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
104 }
105
106 @Override
107 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
108 final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(),
109 nbMultiResponse, nbActions);
110 return new RpcRetryingCaller<MultiResponse>(100, 10) {
111 @Override
112 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int to)
113 throws IOException, RuntimeException {
114 try {
115
116
117 Thread.sleep(1000);
118 } catch (InterruptedException e) {
119
120 }
121 return mr;
122 }
123 };
124 }
125 }
126
127 static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
128
129 public CallerWithFailure() {
130 super(100, 100);
131 }
132
133 @Override
134 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int to)
135 throws IOException, RuntimeException {
136 throw new IOException("test");
137 }
138 }
139
140 static class AsyncProcessWithFailure<Res> extends MyAsyncProcess<Res> {
141
142 public AsyncProcessWithFailure(HConnection hc, Configuration conf) {
143 super(hc, null, conf, new AtomicInteger());
144 serverTrackerTimeout = 1;
145 }
146
147 @Override
148 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
149 return new CallerWithFailure();
150 }
151 }
152
153
154 static MultiResponse createMultiResponse(final HRegionLocation loc,
155 final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
156 final MultiResponse mr = new MultiResponse();
157 nbMultiResponse.incrementAndGet();
158 for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
159 for (Action a : entry.getValue()) {
160 nbActions.incrementAndGet();
161 if (Arrays.equals(FAILS, a.getAction().getRow())) {
162 mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
163 } else {
164 mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success);
165 }
166 }
167 }
168 return mr;
169 }
170
171
172
173 static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation {
174 MyAsyncProcess<?> ap;
175 final AtomicInteger nbThreads = new AtomicInteger(0);
176 final static Configuration c = new Configuration();
177
178 static {
179 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
180 }
181
182 protected MyConnectionImpl() {
183 super(c);
184 }
185
186 protected MyConnectionImpl(Configuration conf) {
187 super(conf);
188 }
189
190 @Override
191 protected <R> AsyncProcess createAsyncProcess(TableName tableName,
192 ExecutorService pool,
193 AsyncProcess.AsyncProcessCallback<R> callback,
194 Configuration confn ) {
195 ap = new MyAsyncProcess<R>(this, callback, c, nbThreads);
196 return ap;
197 }
198
199 @Override
200 public HRegionLocation locateRegion(final TableName tableName,
201 final byte[] row) {
202 return loc1;
203 }
204 }
205
206
207
208
209 static class MyConnectionImpl2 extends MyConnectionImpl {
210 List<HRegionLocation> hrl;
211 final boolean usedRegions[];
212
213 protected MyConnectionImpl2(List<HRegionLocation> hrl) {
214 super(c);
215 this.hrl = hrl;
216 this.usedRegions = new boolean[hrl.size()];
217 }
218
219 @Override
220 public HRegionLocation locateRegion(final TableName tableName,
221 final byte[] row) {
222 int i = 0;
223 for (HRegionLocation hr:hrl){
224 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
225 usedRegions[i] = true;
226 return hr;
227 }
228 i++;
229 }
230 return null;
231 }
232 }
233
234 @Test
235 public void testSubmit() throws Exception {
236 HConnection hc = createHConnection();
237 AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
238
239 List<Put> puts = new ArrayList<Put>();
240 puts.add(createPut(1, true));
241
242 ap.submit(puts, false);
243 Assert.assertTrue(puts.isEmpty());
244 }
245
246 @Test
247 public void testSubmitWithCB() throws Exception {
248 HConnection hc = createHConnection();
249 MyCB mcb = new MyCB();
250 AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
251
252 List<Put> puts = new ArrayList<Put>();
253 puts.add(createPut(1, true));
254
255 ap.submit(puts, false);
256 Assert.assertTrue(puts.isEmpty());
257
258 while (!(mcb.successCalled.get() == 1) && !ap.hasError()) {
259 Thread.sleep(1);
260 }
261 Assert.assertEquals(mcb.successCalled.get(), 1);
262 }
263
264 @Test
265 public void testSubmitBusyRegion() throws Exception {
266 HConnection hc = createHConnection();
267 AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
268
269 List<Put> puts = new ArrayList<Put>();
270 puts.add(createPut(1, true));
271
272 ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
273 ap.submit(puts, false);
274 Assert.assertEquals(puts.size(), 1);
275
276 ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
277 ap.submit(puts, false);
278 Assert.assertTrue(puts.isEmpty());
279 }
280
281
282 @Test
283 public void testSubmitBusyRegionServer() throws Exception {
284 HConnection hc = createHConnection();
285 AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf);
286
287 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
288
289 List<Put> puts = new ArrayList<Put>();
290 puts.add(createPut(1, true));
291 puts.add(createPut(3, true));
292 puts.add(createPut(1, true));
293 puts.add(createPut(2, true));
294
295 ap.submit(puts, false);
296 Assert.assertEquals(" puts=" + puts, 1, puts.size());
297
298 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
299 ap.submit(puts, false);
300 Assert.assertTrue(puts.isEmpty());
301 }
302
303 @Test
304 public void testFail() throws Exception {
305 HConnection hc = createHConnection();
306 MyCB mcb = new MyCB();
307 AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
308
309 List<Put> puts = new ArrayList<Put>();
310 Put p = createPut(1, false);
311 puts.add(p);
312
313 ap.submit(puts, false);
314 Assert.assertTrue(puts.isEmpty());
315
316 while (!ap.hasError()) {
317 Thread.sleep(1);
318 }
319
320 Assert.assertEquals(0, mcb.successCalled.get());
321 Assert.assertEquals(2, mcb.retriableFailure.get());
322 Assert.assertEquals(1, mcb.failureCalled.get());
323
324 Assert.assertEquals(1, ap.getErrors().exceptions.size());
325 Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
326 failure.equals(ap.getErrors().exceptions.get(0)));
327 Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
328 failure.equals(ap.getErrors().exceptions.get(0)));
329
330 Assert.assertEquals(1, ap.getFailedOperations().size());
331 Assert.assertTrue("was: " + ap.getFailedOperations().get(0),
332 p.equals(ap.getFailedOperations().get(0)));
333 }
334
335 @Test
336 public void testWaitForNextTaskDone() throws IOException {
337 HConnection hc = createHConnection();
338 MyCB mcb = new MyCB();
339 final AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
340 ap.tasksSent.incrementAndGet();
341
342 final AtomicBoolean checkPoint = new AtomicBoolean(false);
343 final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
344
345 Thread t = new Thread(){
346 @Override
347 public void run(){
348 Threads.sleep(1000);
349 Assert.assertFalse(checkPoint.get());
350 ap.tasksDone.incrementAndGet();
351 checkPoint2.set(true);
352 }
353 };
354
355 t.start();
356 ap.waitForNextTaskDone(0);
357 checkPoint.set(true);
358 while (!checkPoint2.get()){
359 Threads.sleep(1);
360 }
361 }
362
363 @Test
364 public void testSubmitTrue() throws IOException {
365 HConnection hc = createHConnection();
366 MyCB mcb = new MyCB();
367 final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
368 ap.tasksSent.incrementAndGet();
369 final AtomicInteger ai = new AtomicInteger(1);
370 ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
371
372 final AtomicBoolean checkPoint = new AtomicBoolean(false);
373 final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
374
375 Thread t = new Thread(){
376 @Override
377 public void run(){
378 Threads.sleep(1000);
379 Assert.assertFalse(checkPoint.get());
380 ai.decrementAndGet();
381 ap.tasksDone.incrementAndGet();
382 checkPoint2.set(true);
383 }
384 };
385
386 List<Put> puts = new ArrayList<Put>();
387 Put p = createPut(1, true);
388 puts.add(p);
389
390 ap.submit(puts, false);
391 Assert.assertFalse(puts.isEmpty());
392
393 t.start();
394
395 ap.submit(puts, true);
396 Assert.assertTrue(puts.isEmpty());
397
398 checkPoint.set(true);
399 while (!checkPoint2.get()){
400 Threads.sleep(1);
401 }
402 }
403
404 @Test
405 public void testFailAndSuccess() throws Exception {
406 HConnection hc = createHConnection();
407 MyCB mcb = new MyCB();
408 AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
409
410 List<Put> puts = new ArrayList<Put>();
411 puts.add(createPut(1, false));
412 puts.add(createPut(1, true));
413 puts.add(createPut(1, true));
414
415 ap.submit(puts, false);
416 Assert.assertTrue(puts.isEmpty());
417
418 while (!ap.hasError()) {
419 Thread.sleep(1);
420 }
421 ap.waitUntilDone();
422
423 Assert.assertEquals(mcb.successCalled.get(), 2);
424 Assert.assertEquals(mcb.retriableFailure.get(), 2);
425 Assert.assertEquals(mcb.failureCalled.get(), 1);
426
427 Assert.assertEquals(1, ap.getErrors().actions.size());
428
429
430 puts.add(createPut(1, true));
431 ap.submit(puts, false);
432 Assert.assertTrue(puts.isEmpty());
433
434 while (mcb.successCalled.get() != 3) {
435 Thread.sleep(1);
436 }
437 Assert.assertEquals(mcb.retriableFailure.get(), 2);
438 Assert.assertEquals(mcb.failureCalled.get(), 1);
439
440 ap.clearErrors();
441 Assert.assertTrue(ap.getErrors().actions.isEmpty());
442 }
443
444 @Test
445 public void testFlush() throws Exception {
446 HConnection hc = createHConnection();
447 MyCB mcb = new MyCB();
448 AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
449
450 List<Put> puts = new ArrayList<Put>();
451 puts.add(createPut(1, false));
452 puts.add(createPut(1, true));
453 puts.add(createPut(1, true));
454
455 ap.submit(puts, false);
456 ap.waitUntilDone();
457
458 Assert.assertEquals(mcb.successCalled.get(), 2);
459 Assert.assertEquals(mcb.retriableFailure.get(), 2);
460 Assert.assertEquals(mcb.failureCalled.get(), 1);
461
462 Assert.assertEquals(1, ap.getFailedOperations().size());
463 }
464
465 @Test
466 public void testMaxTask() throws Exception {
467 HConnection hc = createHConnection();
468 final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
469
470 for (int i = 0; i < 1000; i++) {
471 ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
472 }
473
474 final Thread myThread = Thread.currentThread();
475
476 Thread t = new Thread() {
477 public void run() {
478 Threads.sleep(2000);
479 myThread.interrupt();
480 }
481 };
482
483 List<Put> puts = new ArrayList<Put>();
484 puts.add(createPut(1, true));
485
486 t.start();
487
488 try {
489 ap.submit(puts, false);
490 Assert.fail("We should have been interrupted.");
491 } catch (InterruptedIOException expected) {
492 }
493
494 final long sleepTime = 2000;
495
496 Thread t2 = new Thread() {
497 public void run() {
498 Threads.sleep(sleepTime);
499 while (ap.tasksDone.get() > 0) {
500 ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
501 }
502 }
503 };
504 t2.start();
505
506 long start = System.currentTimeMillis();
507 ap.submit(new ArrayList<Row>(), false);
508 long end = System.currentTimeMillis();
509
510
511 Assert.assertTrue(start + 100L + sleepTime > end);
512 }
513
514
515 private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> {
516 private final AtomicInteger successCalled = new AtomicInteger(0);
517 private final AtomicInteger failureCalled = new AtomicInteger(0);
518 private final AtomicInteger retriableFailure = new AtomicInteger(0);
519
520
521 @Override
522 public void success(int originalIndex, byte[] region, Row row, Object o) {
523 successCalled.incrementAndGet();
524 }
525
526 @Override
527 public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
528 failureCalled.incrementAndGet();
529 return true;
530 }
531
532 @Override
533 public boolean retriableFailure(int originalIndex, Row row, byte[] region,
534 Throwable exception) {
535
536 return (retriableFailure.incrementAndGet() < 2);
537 }
538 }
539
540
541 private static HConnection createHConnection() throws IOException {
542 HConnection hc = Mockito.mock(HConnection.class);
543
544 Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
545 Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1);
546 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
547 Mockito.eq(DUMMY_BYTES_1))).thenReturn(loc1);
548
549 Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
550 Mockito.eq(DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn(loc2);
551 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
552 Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
553
554 Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
555 Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
556 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
557 Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
558
559 Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
560 Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
561 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
562 Mockito.eq(FAILS))).thenReturn(loc2);
563
564 return hc;
565 }
566
567 @Test
568 public void testHTablePutSuccess() throws Exception {
569 HTable ht = Mockito.mock(HTable.class);
570 HConnection hc = createHConnection();
571 ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
572
573 Put put = createPut(1, true);
574
575 Assert.assertEquals(0, ht.getWriteBufferSize());
576 ht.put(put);
577 Assert.assertEquals(0, ht.getWriteBufferSize());
578 }
579
580 private void doHTableFailedPut(boolean bufferOn) throws Exception {
581 HTable ht = new HTable();
582 HConnection hc = createHConnection();
583 MyCB mcb = new MyCB();
584 ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
585 ht.setAutoFlush(true, true);
586 if (bufferOn) {
587 ht.setWriteBufferSize(1024L * 1024L);
588 } else {
589 ht.setWriteBufferSize(0L);
590 }
591
592 Put put = createPut(1, false);
593
594 Assert.assertEquals(0L, ht.currentWriteBufferSize);
595 try {
596 ht.put(put);
597 if (bufferOn) {
598 ht.flushCommits();
599 }
600 Assert.fail();
601 } catch (RetriesExhaustedException expected) {
602 }
603 Assert.assertEquals(0L, ht.currentWriteBufferSize);
604 Assert.assertEquals(0, mcb.successCalled.get());
605 Assert.assertEquals(2, mcb.retriableFailure.get());
606 Assert.assertEquals(1, mcb.failureCalled.get());
607
608
609 ht.close();
610 }
611
612 @Test
613 public void testHTableFailedPutWithBuffer() throws Exception {
614 doHTableFailedPut(true);
615 }
616
617 @Test
618 public void doHTableFailedPutWithoutBuffer() throws Exception {
619 doHTableFailedPut(false);
620 }
621
622 @Test
623 public void testHTableFailedPutAndNewPut() throws Exception {
624 HTable ht = new HTable();
625 HConnection hc = createHConnection();
626 MyCB mcb = new MyCB();
627 ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
628 ht.setAutoFlush(false, true);
629 ht.setWriteBufferSize(0);
630
631 Put p = createPut(1, false);
632 ht.put(p);
633
634 ht.ap.waitUntilDone();
635
636
637
638
639
640
641 p = createPut(1, true);
642 Assert.assertEquals(0, ht.writeAsyncBuffer.size());
643 try {
644 ht.put(p);
645 Assert.fail();
646 } catch (RetriesExhaustedException expected) {
647 }
648 Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size());
649 }
650
651
652 @Test
653 public void testWithNoClearOnFail() throws IOException {
654 HTable ht = new HTable();
655 HConnection hc = createHConnection();
656 MyCB mcb = new MyCB();
657 ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
658 ht.setAutoFlush(false, false);
659
660 Put p = createPut(1, false);
661 ht.put(p);
662 Assert.assertEquals(0, ht.writeAsyncBuffer.size());
663 try {
664 ht.flushCommits();
665 } catch (RetriesExhaustedWithDetailsException expected) {
666 }
667 Assert.assertEquals(1, ht.writeAsyncBuffer.size());
668
669 try {
670 ht.close();
671 } catch (RetriesExhaustedWithDetailsException expected) {
672 }
673 Assert.assertEquals(1, ht.writeAsyncBuffer.size());
674 }
675
676 @Test
677 public void testBatch() throws IOException, InterruptedException {
678 HTable ht = new HTable();
679 ht.connection = new MyConnectionImpl();
680
681 List<Put> puts = new ArrayList<Put>();
682 puts.add(createPut(1, true));
683 puts.add(createPut(1, true));
684 puts.add(createPut(1, true));
685 puts.add(createPut(1, true));
686 puts.add(createPut(1, false));
687 puts.add(createPut(1, true));
688 puts.add(createPut(1, false));
689
690 Object[] res = new Object[puts.size()];
691 try {
692 ht.processBatch(puts, res);
693 Assert.fail();
694 } catch (RetriesExhaustedException expected) {
695 }
696
697 Assert.assertEquals(res[0], success);
698 Assert.assertEquals(res[1], success);
699 Assert.assertEquals(res[2], success);
700 Assert.assertEquals(res[3], success);
701 Assert.assertEquals(res[4], failure);
702 Assert.assertEquals(res[5], success);
703 Assert.assertEquals(res[6], failure);
704 }
705
706 @Test
707 public void testErrorsServers() throws IOException {
708 HTable ht = new HTable();
709 Configuration configuration = new Configuration(conf);
710 configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
711 configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
712
713 ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
714
715 MyConnectionImpl mci = new MyConnectionImpl(configuration);
716 ht.connection = mci;
717 ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
718
719
720 Assert.assertNotNull(ht.ap.createServerErrorTracker());
721 Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
722 ht.ap.serverTrackerTimeout = 1;
723
724 Put p = createPut(1, false);
725 ht.setAutoFlush(false, false);
726 ht.put(p);
727
728 try {
729 ht.flushCommits();
730 Assert.fail();
731 } catch (RetriesExhaustedWithDetailsException expected) {
732 }
733
734 Assert.assertEquals(ht.ap.tasksSent.get(), 3);
735 }
736
737 @Test
738 public void testGlobalErrors() throws IOException {
739 HTable ht = new HTable();
740 Configuration configuration = new Configuration(conf);
741 configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
742 configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
743 ht.connection = new MyConnectionImpl(configuration);
744 AsyncProcessWithFailure<Object> ap =
745 new AsyncProcessWithFailure<Object>(ht.connection, configuration);
746 ht.ap = ap;
747
748 Assert.assertNotNull(ht.ap.createServerErrorTracker());
749
750 Put p = createPut(1, true);
751 ht.setAutoFlush(false, false);
752 ht.put(p);
753
754 try {
755 ht.flushCommits();
756 Assert.fail();
757 } catch (RetriesExhaustedWithDetailsException expected) {
758 }
759
760 Assert.assertEquals(3, ht.ap.tasksSent.get());
761 }
762
763
764
765
766
767
768 @Test
769 public void testThreadCreation() throws Exception {
770 final int NB_REGS = 100;
771 List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
772 List<Get> gets = new ArrayList<Get>(NB_REGS);
773 for (int i = 0; i < NB_REGS; i++) {
774 HRegionInfo hri = new HRegionInfo(
775 DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
776 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
777 hrls.add(hrl);
778
779 Get get = new Get(Bytes.toBytes(i * 10L));
780 gets.add(get);
781 }
782
783 HTable ht = new HTable();
784 MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
785 ht.connection = con;
786
787 ht.batch(gets);
788
789 Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
790 Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
791 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
792
793 int nbReg = 0;
794 for (int i =0; i<NB_REGS; i++){
795 if (con.usedRegions[i]) nbReg++;
796 }
797 Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
798 }
799
800
801
802
803
804
805
806 private Put createPut(int regCnt, boolean success) {
807 Put p;
808 if (!success) {
809 p = new Put(FAILS);
810 } else switch (regCnt){
811 case 1 :
812 p = new Put(DUMMY_BYTES_1);
813 break;
814 case 2:
815 p = new Put(DUMMY_BYTES_2);
816 break;
817 case 3:
818 p = new Put(DUMMY_BYTES_3);
819 break;
820 default:
821 throw new IllegalArgumentException("unknown " + regCnt);
822 }
823
824 p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
825
826 return p;
827 }
828 }