View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.TableName;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.HRegionInfo;
27  import org.apache.hadoop.hbase.HRegionLocation;
28  import org.apache.hadoop.hbase.MediumTests;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.Threads;
33  import org.junit.Assert;
34  import org.junit.Test;
35  import org.junit.experimental.categories.Category;
36  import org.mockito.Mockito;
37  
38  import java.io.IOException;
39  import java.io.InterruptedIOException;
40  import java.util.ArrayList;
41  import java.util.Arrays;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.SynchronousQueue;
46  import java.util.concurrent.ThreadFactory;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicBoolean;
50  import java.util.concurrent.atomic.AtomicInteger;
51  
52  @Category(MediumTests.class)
53  public class TestAsyncProcess {
54    private static final TableName DUMMY_TABLE =
55        TableName.valueOf("DUMMY_TABLE");
56    private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
57    private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
58    private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
59    private static final byte[] FAILS = "FAILS".getBytes();
60    private static final Configuration conf = new Configuration();
61  
62    private static ServerName sn = ServerName.valueOf("localhost:10,1254");
63    private static ServerName sn2 = ServerName.valueOf("localhost:140,12540");
64    private static HRegionInfo hri1 =
65        new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
66    private static HRegionInfo hri2 =
67        new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
68    private static HRegionInfo hri3 =
69        new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
70    private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
71    private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
72    private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
73  
74    private static final String success = "success";
75    private static Exception failure = new Exception("failure");
76  
77    static class CountingThreadFactory implements ThreadFactory {
78      final AtomicInteger nbThreads;
79      ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
80      @Override
81      public Thread newThread(Runnable r) {
82        nbThreads.incrementAndGet();
83        return realFactory.newThread(r);
84      }
85  
86      CountingThreadFactory(AtomicInteger nbThreads){
87        this.nbThreads = nbThreads;
88      }
89    }
90  
91    static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
92      final AtomicInteger nbMultiResponse = new AtomicInteger();
93      final AtomicInteger nbActions = new AtomicInteger();
94  
95      public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) {
96        this(hc, callback, conf, new AtomicInteger());
97      }
98  
99        public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf,
100                           AtomicInteger nbThreads) {
101       super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
102         new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
103           callback, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
104     }
105 
106     @Override
107     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
108       final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(),
109           nbMultiResponse, nbActions);
110       return new RpcRetryingCaller<MultiResponse>(100, 10) {
111         @Override
112         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int to)
113         throws IOException, RuntimeException {
114           try {
115             // sleep one second in order for threadpool to start another thread instead of reusing
116             // existing one. 
117             Thread.sleep(1000);
118           } catch (InterruptedException e) {
119             // ignore error
120           }
121           return mr;
122         }
123       };
124     }
125   }
126 
127   static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
128 
129     public CallerWithFailure() {
130       super(100, 100);
131     }
132 
133     @Override
134     public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int to)
135       throws IOException, RuntimeException {
136       throw new IOException("test");
137     }
138   }
139 
140   static class AsyncProcessWithFailure<Res> extends MyAsyncProcess<Res> {
141 
142     public AsyncProcessWithFailure(HConnection hc, Configuration conf) {
143       super(hc, null, conf, new AtomicInteger());
144       serverTrackerTimeout = 1;
145     }
146 
147     @Override
148     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
149       return new CallerWithFailure();
150     }
151   }
152 
153 
154   static MultiResponse createMultiResponse(final HRegionLocation loc,
155       final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
156     final MultiResponse mr = new MultiResponse();
157     nbMultiResponse.incrementAndGet();
158     for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
159       for (Action a : entry.getValue()) {
160         nbActions.incrementAndGet();
161         if (Arrays.equals(FAILS, a.getAction().getRow())) {
162           mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure);
163         } else {
164           mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success);
165         }
166       }
167     }
168     return mr;
169   }
170   /**
171    * Returns our async process.
172    */
173   static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation {
174     MyAsyncProcess<?> ap;
175     final AtomicInteger nbThreads = new AtomicInteger(0);
176     final static Configuration c = new Configuration();
177 
178     static {
179       c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
180     }
181 
182     protected MyConnectionImpl() {
183       super(c);
184     }
185 
186     protected MyConnectionImpl(Configuration conf) {
187       super(conf);
188     }
189 
190     @Override
191     protected <R> AsyncProcess createAsyncProcess(TableName tableName,
192                                                   ExecutorService pool,
193                                                   AsyncProcess.AsyncProcessCallback<R> callback,
194                                                   Configuration confn ) {
195       ap = new MyAsyncProcess<R>(this, callback, c, nbThreads);
196       return ap;
197     }
198 
199     @Override
200     public HRegionLocation locateRegion(final TableName tableName,
201                                         final byte[] row) {
202       return loc1;
203     }
204   }
205 
206   /**
207    * Returns our async process.
208    */
209   static class MyConnectionImpl2 extends MyConnectionImpl {
210     List<HRegionLocation> hrl;
211     final boolean usedRegions[];
212 
213     protected MyConnectionImpl2(List<HRegionLocation> hrl) {
214       super(c);
215       this.hrl = hrl;
216       this.usedRegions = new boolean[hrl.size()];
217     }
218 
219     @Override
220     public HRegionLocation locateRegion(final TableName tableName,
221                                         final byte[] row) {
222       int i = 0;
223       for (HRegionLocation hr:hrl){
224         if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
225             usedRegions[i] = true;
226           return hr;
227         }
228         i++;
229       }
230       return null;
231     }
232   }
233 
234   @Test
235   public void testSubmit() throws Exception {
236     HConnection hc = createHConnection();
237     AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
238 
239     List<Put> puts = new ArrayList<Put>();
240     puts.add(createPut(1, true));
241 
242     ap.submit(puts, false);
243     Assert.assertTrue(puts.isEmpty());
244   }
245 
246   @Test
247   public void testSubmitWithCB() throws Exception {
248     HConnection hc = createHConnection();
249     MyCB mcb = new MyCB();
250     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
251 
252     List<Put> puts = new ArrayList<Put>();
253     puts.add(createPut(1, true));
254 
255     ap.submit(puts, false);
256     Assert.assertTrue(puts.isEmpty());
257 
258     while (!(mcb.successCalled.get() == 1) && !ap.hasError()) {
259       Thread.sleep(1);
260     }
261     Assert.assertEquals(mcb.successCalled.get(), 1);
262   }
263 
264   @Test
265   public void testSubmitBusyRegion() throws Exception {
266     HConnection hc = createHConnection();
267     AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
268 
269     List<Put> puts = new ArrayList<Put>();
270     puts.add(createPut(1, true));
271 
272     ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
273     ap.submit(puts, false);
274     Assert.assertEquals(puts.size(), 1);
275 
276     ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
277     ap.submit(puts, false);
278     Assert.assertTrue(puts.isEmpty());
279   }
280 
281 
282   @Test
283   public void testSubmitBusyRegionServer() throws Exception {
284     HConnection hc = createHConnection();
285     AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf);
286 
287     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
288 
289     List<Put> puts = new ArrayList<Put>();
290     puts.add(createPut(1, true));
291     puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
292     puts.add(createPut(1, true)); // <== this one will make it, the region is already in
293     puts.add(createPut(2, true)); // <== new region, but the rs is ok
294 
295     ap.submit(puts, false);
296     Assert.assertEquals(" puts=" + puts, 1, puts.size());
297 
298     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
299     ap.submit(puts, false);
300     Assert.assertTrue(puts.isEmpty());
301   }
302 
303   @Test
304   public void testFail() throws Exception {
305     HConnection hc = createHConnection();
306     MyCB mcb = new MyCB();
307     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
308 
309     List<Put> puts = new ArrayList<Put>();
310     Put p = createPut(1, false);
311     puts.add(p);
312 
313     ap.submit(puts, false);
314     Assert.assertTrue(puts.isEmpty());
315 
316     while (!ap.hasError()) {
317       Thread.sleep(1);
318     }
319 
320     Assert.assertEquals(0, mcb.successCalled.get());
321     Assert.assertEquals(2, mcb.retriableFailure.get());
322     Assert.assertEquals(1, mcb.failureCalled.get());
323 
324     Assert.assertEquals(1, ap.getErrors().exceptions.size());
325     Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
326         failure.equals(ap.getErrors().exceptions.get(0)));
327     Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
328         failure.equals(ap.getErrors().exceptions.get(0)));
329 
330     Assert.assertEquals(1, ap.getFailedOperations().size());
331     Assert.assertTrue("was: " + ap.getFailedOperations().get(0),
332         p.equals(ap.getFailedOperations().get(0)));
333   }
334 
335   @Test
336   public void testWaitForNextTaskDone() throws IOException {
337     HConnection hc = createHConnection();
338     MyCB mcb = new MyCB();
339     final AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
340     ap.tasksSent.incrementAndGet();
341 
342     final AtomicBoolean checkPoint = new AtomicBoolean(false);
343     final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
344 
345     Thread t = new Thread(){
346       @Override
347       public void run(){
348         Threads.sleep(1000);
349         Assert.assertFalse(checkPoint.get());
350         ap.tasksDone.incrementAndGet();
351         checkPoint2.set(true);
352       }
353     };
354 
355     t.start();
356     ap.waitForNextTaskDone(0);
357     checkPoint.set(true);
358     while (!checkPoint2.get()){
359       Threads.sleep(1);
360     }
361   }
362 
363   @Test
364   public void testSubmitTrue() throws IOException {
365     HConnection hc = createHConnection();
366     MyCB mcb = new MyCB();
367     final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
368     ap.tasksSent.incrementAndGet();
369     final AtomicInteger ai = new AtomicInteger(1);
370     ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
371 
372     final AtomicBoolean checkPoint = new AtomicBoolean(false);
373     final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
374 
375     Thread t = new Thread(){
376       @Override
377       public void run(){
378         Threads.sleep(1000);
379         Assert.assertFalse(checkPoint.get());
380         ai.decrementAndGet();
381         ap.tasksDone.incrementAndGet();
382         checkPoint2.set(true);
383       }
384     };
385 
386     List<Put> puts = new ArrayList<Put>();
387     Put p = createPut(1, true);
388     puts.add(p);
389 
390     ap.submit(puts, false);
391     Assert.assertFalse(puts.isEmpty());
392 
393     t.start();
394 
395     ap.submit(puts, true);
396     Assert.assertTrue(puts.isEmpty());
397 
398     checkPoint.set(true);
399     while (!checkPoint2.get()){
400       Threads.sleep(1);
401     }
402   }
403 
404   @Test
405   public void testFailAndSuccess() throws Exception {
406     HConnection hc = createHConnection();
407     MyCB mcb = new MyCB();
408     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
409 
410     List<Put> puts = new ArrayList<Put>();
411     puts.add(createPut(1, false));
412     puts.add(createPut(1, true));
413     puts.add(createPut(1, true));
414 
415     ap.submit(puts, false);
416     Assert.assertTrue(puts.isEmpty());
417 
418     while (!ap.hasError()) {
419       Thread.sleep(1);
420     }
421     ap.waitUntilDone();
422  
423     Assert.assertEquals(mcb.successCalled.get(), 2);
424     Assert.assertEquals(mcb.retriableFailure.get(), 2);
425     Assert.assertEquals(mcb.failureCalled.get(), 1);
426 
427     Assert.assertEquals(1, ap.getErrors().actions.size());
428 
429 
430     puts.add(createPut(1, true));
431     ap.submit(puts, false);
432     Assert.assertTrue(puts.isEmpty());
433 
434     while (mcb.successCalled.get() != 3) {
435       Thread.sleep(1);
436     }
437     Assert.assertEquals(mcb.retriableFailure.get(), 2);
438     Assert.assertEquals(mcb.failureCalled.get(), 1);
439 
440     ap.clearErrors();
441     Assert.assertTrue(ap.getErrors().actions.isEmpty());
442   }
443 
444   @Test
445   public void testFlush() throws Exception {
446     HConnection hc = createHConnection();
447     MyCB mcb = new MyCB();
448     AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
449 
450     List<Put> puts = new ArrayList<Put>();
451     puts.add(createPut(1, false));
452     puts.add(createPut(1, true));
453     puts.add(createPut(1, true));
454 
455     ap.submit(puts, false);
456     ap.waitUntilDone();
457 
458     Assert.assertEquals(mcb.successCalled.get(), 2);
459     Assert.assertEquals(mcb.retriableFailure.get(), 2);
460     Assert.assertEquals(mcb.failureCalled.get(), 1);
461 
462     Assert.assertEquals(1, ap.getFailedOperations().size());
463   }
464 
465   @Test
466   public void testMaxTask() throws Exception {
467     HConnection hc = createHConnection();
468     final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
469 
470     for (int i = 0; i < 1000; i++) {
471       ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
472     }
473 
474     final Thread myThread = Thread.currentThread();
475 
476     Thread t = new Thread() {
477       public void run() {
478         Threads.sleep(2000);
479         myThread.interrupt();
480       }
481     };
482 
483     List<Put> puts = new ArrayList<Put>();
484     puts.add(createPut(1, true));
485 
486     t.start();
487 
488     try {
489       ap.submit(puts, false);
490       Assert.fail("We should have been interrupted.");
491     } catch (InterruptedIOException expected) {
492     }
493 
494     final long sleepTime = 2000;
495 
496     Thread t2 = new Thread() {
497       public void run() {
498         Threads.sleep(sleepTime);
499         while (ap.tasksDone.get() > 0) {
500           ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
501         }
502       }
503     };
504     t2.start();
505 
506     long start = System.currentTimeMillis();
507     ap.submit(new ArrayList<Row>(), false);
508     long end = System.currentTimeMillis();
509 
510     //Adds 100 to secure us against approximate timing.
511     Assert.assertTrue(start + 100L + sleepTime > end);
512   }
513 
514 
515   private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> {
516     private final AtomicInteger successCalled = new AtomicInteger(0);
517     private final AtomicInteger failureCalled = new AtomicInteger(0);
518     private final AtomicInteger retriableFailure = new AtomicInteger(0);
519 
520 
521     @Override
522     public void success(int originalIndex, byte[] region, Row row, Object o) {
523       successCalled.incrementAndGet();
524     }
525 
526     @Override
527     public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
528       failureCalled.incrementAndGet();
529       return true;
530     }
531 
532     @Override
533     public boolean retriableFailure(int originalIndex, Row row, byte[] region,
534                                     Throwable exception) {
535       // We retry once only.
536       return (retriableFailure.incrementAndGet() < 2);
537     }
538   }
539 
540 
541   private static HConnection createHConnection() throws IOException {
542     HConnection hc = Mockito.mock(HConnection.class);
543 
544     Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
545         Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1);
546     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
547         Mockito.eq(DUMMY_BYTES_1))).thenReturn(loc1);
548 
549     Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
550         Mockito.eq(DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn(loc2);
551     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
552         Mockito.eq(DUMMY_BYTES_2))).thenReturn(loc2);
553 
554     Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
555         Mockito.eq(DUMMY_BYTES_3), Mockito.anyBoolean())).thenReturn(loc2);
556     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
557         Mockito.eq(DUMMY_BYTES_3))).thenReturn(loc3);
558 
559     Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
560         Mockito.eq(FAILS), Mockito.anyBoolean())).thenReturn(loc2);
561     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
562         Mockito.eq(FAILS))).thenReturn(loc2);
563 
564     return hc;
565   }
566 
567   @Test
568   public void testHTablePutSuccess() throws Exception {
569     HTable ht = Mockito.mock(HTable.class);
570     HConnection hc = createHConnection();
571     ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
572 
573     Put put = createPut(1, true);
574 
575     Assert.assertEquals(0, ht.getWriteBufferSize());
576     ht.put(put);
577     Assert.assertEquals(0, ht.getWriteBufferSize());
578   }
579 
580   private void doHTableFailedPut(boolean bufferOn) throws Exception {
581     HTable ht = new HTable();
582     HConnection hc = createHConnection();
583     MyCB mcb = new MyCB(); // This allows to have some hints on what's going on.
584     ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
585     ht.setAutoFlush(true, true);
586     if (bufferOn) {
587       ht.setWriteBufferSize(1024L * 1024L);
588     } else {
589       ht.setWriteBufferSize(0L);
590     }
591 
592     Put put = createPut(1, false);
593 
594     Assert.assertEquals(0L, ht.currentWriteBufferSize);
595     try {
596       ht.put(put);
597       if (bufferOn) {
598         ht.flushCommits();
599       }
600       Assert.fail();
601     } catch (RetriesExhaustedException expected) {
602     }
603     Assert.assertEquals(0L, ht.currentWriteBufferSize);
604     Assert.assertEquals(0, mcb.successCalled.get());
605     Assert.assertEquals(2, mcb.retriableFailure.get());
606     Assert.assertEquals(1, mcb.failureCalled.get());
607 
608     // This should not raise any exception, puts have been 'received' before by the catch.
609     ht.close();
610   }
611 
612   @Test
613   public void testHTableFailedPutWithBuffer() throws Exception {
614     doHTableFailedPut(true);
615   }
616 
617   @Test
618   public void doHTableFailedPutWithoutBuffer() throws Exception {
619     doHTableFailedPut(false);
620   }
621 
622   @Test
623   public void testHTableFailedPutAndNewPut() throws Exception {
624     HTable ht = new HTable();
625     HConnection hc = createHConnection();
626     MyCB mcb = new MyCB(); // This allows to have some hints on what's going on.
627     ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
628     ht.setAutoFlush(false, true);
629     ht.setWriteBufferSize(0);
630 
631     Put p = createPut(1, false);
632     ht.put(p);
633 
634     ht.ap.waitUntilDone(); // Let's do all the retries.
635 
636     // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
637     //  doPut if it fails.
638     // This said, it's not a very easy going behavior. For example, when we insert a list of
639     //  puts, we may raise an exception in the middle of the list. It's then up to the caller to
640     //  manage what was inserted, what was tried but failed, and what was not even tried.
641     p = createPut(1, true);
642     Assert.assertEquals(0, ht.writeAsyncBuffer.size());
643     try {
644       ht.put(p);
645       Assert.fail();
646     } catch (RetriesExhaustedException expected) {
647     }
648     Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size());
649   }
650 
651 
652   @Test
653   public void testWithNoClearOnFail() throws IOException {
654     HTable ht = new HTable();
655     HConnection hc = createHConnection();
656     MyCB mcb = new MyCB();
657     ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
658     ht.setAutoFlush(false, false);
659 
660     Put p = createPut(1, false);
661     ht.put(p);
662     Assert.assertEquals(0, ht.writeAsyncBuffer.size());
663     try {
664       ht.flushCommits();
665     } catch (RetriesExhaustedWithDetailsException expected) {
666     }
667     Assert.assertEquals(1, ht.writeAsyncBuffer.size());
668 
669     try {
670       ht.close();
671     } catch (RetriesExhaustedWithDetailsException expected) {
672     }
673     Assert.assertEquals(1, ht.writeAsyncBuffer.size());
674   }
675 
676   @Test
677   public void testBatch() throws IOException, InterruptedException {
678     HTable ht = new HTable();
679     ht.connection = new MyConnectionImpl();
680 
681     List<Put> puts = new ArrayList<Put>();
682     puts.add(createPut(1, true));
683     puts.add(createPut(1, true));
684     puts.add(createPut(1, true));
685     puts.add(createPut(1, true));
686     puts.add(createPut(1, false)); // <=== the bad apple, position 4
687     puts.add(createPut(1, true));
688     puts.add(createPut(1, false)); // <=== another bad apple, position 6
689 
690     Object[] res = new Object[puts.size()];
691     try {
692       ht.processBatch(puts, res);
693       Assert.fail();
694     } catch (RetriesExhaustedException expected) {
695     }
696 
697     Assert.assertEquals(res[0], success);
698     Assert.assertEquals(res[1], success);
699     Assert.assertEquals(res[2], success);
700     Assert.assertEquals(res[3], success);
701     Assert.assertEquals(res[4], failure);
702     Assert.assertEquals(res[5], success);
703     Assert.assertEquals(res[6], failure);
704   }
705 
706   @Test
707   public void testErrorsServers() throws IOException {
708     HTable ht = new HTable();
709     Configuration configuration = new Configuration(conf);
710     configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
711     configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
712     // set default writeBufferSize
713     ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
714 
715     MyConnectionImpl mci = new MyConnectionImpl(configuration);
716     ht.connection = mci;
717     ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
718 
719 
720     Assert.assertNotNull(ht.ap.createServerErrorTracker());
721     Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
722     ht.ap.serverTrackerTimeout = 1;
723 
724     Put p = createPut(1, false);
725     ht.setAutoFlush(false, false);
726     ht.put(p);
727 
728     try {
729       ht.flushCommits();
730       Assert.fail();
731     } catch (RetriesExhaustedWithDetailsException expected) {
732     }
733     // Checking that the ErrorsServers came into play and didn't make us stop immediately
734     Assert.assertEquals(ht.ap.tasksSent.get(), 3);
735   }
736 
737   @Test
738   public void testGlobalErrors() throws IOException {
739     HTable ht = new HTable();
740     Configuration configuration = new Configuration(conf);
741     configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
742     configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
743     ht.connection = new MyConnectionImpl(configuration);
744     AsyncProcessWithFailure<Object> ap =
745       new AsyncProcessWithFailure<Object>(ht.connection, configuration);
746     ht.ap = ap;
747 
748     Assert.assertNotNull(ht.ap.createServerErrorTracker());
749 
750     Put p = createPut(1, true);
751     ht.setAutoFlush(false, false);
752     ht.put(p);
753 
754     try {
755       ht.flushCommits();
756       Assert.fail();
757     } catch (RetriesExhaustedWithDetailsException expected) {
758     }
759     // Checking that the ErrorsServers came into play and didn't make us stop immediately
760     Assert.assertEquals(3, ht.ap.tasksSent.get());
761   }
762 
763 
764   /**
765    * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
766    *  2 threads: 1 per server, this whatever the number of regions.
767    */
768   @Test
769   public void testThreadCreation() throws Exception {
770     final int NB_REGS = 100;
771     List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
772     List<Get> gets = new ArrayList<Get>(NB_REGS);
773     for (int i = 0; i < NB_REGS; i++) {
774       HRegionInfo hri = new HRegionInfo(
775           DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
776       HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
777       hrls.add(hrl);
778 
779       Get get = new Get(Bytes.toBytes(i * 10L));
780       gets.add(get);
781     }
782 
783     HTable ht = new HTable();
784     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
785     ht.connection = con;
786 
787     ht.batch(gets);
788 
789     Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
790     Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
791     Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
792 
793     int nbReg = 0;
794     for (int i =0; i<NB_REGS; i++){
795       if (con.usedRegions[i]) nbReg++;
796     }
797     Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
798   }
799 
800 
801   /**
802    * @param regCnt  the region: 1 to 3.
803    * @param success if true, the put will succeed.
804    * @return a put
805    */
806   private Put createPut(int regCnt, boolean success) {
807     Put p;
808     if (!success) {
809       p = new Put(FAILS);
810     } else switch (regCnt){
811       case 1 :
812         p = new Put(DUMMY_BYTES_1);
813         break;
814       case 2:
815         p = new Put(DUMMY_BYTES_2);
816         break;
817       case 3:
818         p = new Put(DUMMY_BYTES_3);
819         break;
820       default:
821         throw new IllegalArgumentException("unknown " + regCnt);
822     }
823 
824     p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
825 
826     return p;
827   }
828 }