View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.lang.reflect.Field;
28  import java.util.ArrayList;
29  import java.util.List;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.ThreadPoolExecutor;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.commons.logging.impl.Log4JLogger;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.MediumTests;
40  import org.apache.hadoop.hbase.Waiter;
41  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
42  import org.apache.hadoop.hbase.ipc.RpcClient;
43  import org.apache.hadoop.hbase.ipc.RpcServer;
44  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.JVMClusterUtil;
47  import org.apache.hadoop.hbase.util.Threads;
48  import org.apache.log4j.Level;
49  import org.junit.AfterClass;
50  import org.junit.Assert;
51  import org.junit.Before;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  @Category(MediumTests.class)
57  public class TestMultiParallel {
58    private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
59    {
60      ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
61      ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
62    }
63    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
64    private static final byte[] VALUE = Bytes.toBytes("value");
65    private static final byte[] QUALIFIER = Bytes.toBytes("qual");
66    private static final String FAMILY = "family";
67    private static final String TEST_TABLE = "multi_test_table";
68    private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
69    private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
70    private static final byte [][] KEYS = makeKeys();
71  
72    private static final int slaves = 5; // also used for testing HTable pool size
73  
74    @BeforeClass public static void beforeClass() throws Exception {
75      ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
76      ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
77      ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
78      UTIL.startMiniCluster(slaves);
79      HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
80      UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
81      UTIL.waitTableEnabled(Bytes.toBytes(TEST_TABLE));
82      t.close();
83    }
84  
85    @AfterClass public static void afterClass() throws Exception {
86      UTIL.shutdownMiniCluster();
87    }
88  
89    @Before public void before() throws Exception {
90      LOG.info("before");
91      if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
92        // Distribute regions
93        UTIL.getMiniHBaseCluster().getMaster().balance();
94  
95        // Wait until completing balance
96        UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
97      }
98      HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration());
99      conn.clearRegionCache();
100     conn.close();
101     LOG.info("before done");
102   }
103 
104   private static byte[][] makeKeys() {
105     byte [][] starterKeys = HBaseTestingUtility.KEYS;
106     // Create a "non-uniform" test set with the following characteristics:
107     // a) Unequal number of keys per region
108 
109     // Don't use integer as a multiple, so that we have a number of keys that is
110     // not a multiple of the number of regions
111     int numKeys = (int) ((float) starterKeys.length * 10.33F);
112 
113     List<byte[]> keys = new ArrayList<byte[]>();
114     for (int i = 0; i < numKeys; i++) {
115       int kIdx = i % starterKeys.length;
116       byte[] k = starterKeys[kIdx];
117       byte[] cp = new byte[k.length + 1];
118       System.arraycopy(k, 0, cp, 0, k.length);
119       cp[k.length] = new Integer(i % 256).byteValue();
120       keys.add(cp);
121     }
122 
123     // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
124     // should work)
125     // c) keys are not in sorted order (within a region), to ensure that the
126     // sorting code and index mapping doesn't break the functionality
127     for (int i = 0; i < 100; i++) {
128       int kIdx = i % starterKeys.length;
129       byte[] k = starterKeys[kIdx];
130       byte[] cp = new byte[k.length + 1];
131       System.arraycopy(k, 0, cp, 0, k.length);
132       cp[k.length] = new Integer(i % 256).byteValue();
133       keys.add(cp);
134     }
135     return keys.toArray(new byte [][] {new byte [] {}});
136   }
137 
138 
139   /**
140    * This is for testing the active number of threads that were used while
141    * doing a batch operation. It inserts one row per region via the batch
142    * operation, and then checks the number of active threads.
143    * For HBASE-3553
144    * @throws IOException
145    * @throws InterruptedException
146    * @throws NoSuchFieldException
147    * @throws SecurityException
148    */
149   @Test(timeout=300000)
150   public void testActiveThreadsCount() throws Exception{
151     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
152     List<Row> puts = constructPutRequests(); // creates a Put for every region
153     table.batch(puts);
154     Field poolField = table.getClass().getDeclaredField("pool");
155     poolField.setAccessible(true);
156     ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table);
157     assertEquals(slaves, tExecutor.getLargestPoolSize());
158     table.close();
159   }
160 
161   @Test(timeout=300000)
162   public void testBatchWithGet() throws Exception {
163     LOG.info("test=testBatchWithGet");
164     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
165 
166     // load test data
167     List<Row> puts = constructPutRequests();
168     table.batch(puts);
169 
170     // create a list of gets and run it
171     List<Row> gets = new ArrayList<Row>();
172     for (byte[] k : KEYS) {
173       Get get = new Get(k);
174       get.addColumn(BYTES_FAMILY, QUALIFIER);
175       gets.add(get);
176     }
177     Result[] multiRes = new Result[gets.size()];
178     table.batch(gets, multiRes);
179 
180     // Same gets using individual call API
181     List<Result> singleRes = new ArrayList<Result>();
182     for (Row get : gets) {
183       singleRes.add(table.get((Get) get));
184     }
185     // Compare results
186     Assert.assertEquals(singleRes.size(), multiRes.length);
187     for (int i = 0; i < singleRes.size(); i++) {
188       Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
189       Cell[] singleKvs = singleRes.get(i).rawCells();
190       Cell[] multiKvs = multiRes[i].rawCells();
191       for (int j = 0; j < singleKvs.length; j++) {
192         Assert.assertEquals(singleKvs[j], multiKvs[j]);
193         Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), 
194             CellUtil.cloneValue(multiKvs[j])));
195       }
196     }
197     table.close();
198   }
199 
200   @Test
201   public void testBadFam() throws Exception {
202     LOG.info("test=testBadFam");
203     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
204 
205     List<Row> actions = new ArrayList<Row>();
206     Put p = new Put(Bytes.toBytes("row1"));
207     p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
208     actions.add(p);
209     p = new Put(Bytes.toBytes("row2"));
210     p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
211     actions.add(p);
212 
213     // row1 and row2 should be in the same region.
214 
215     Object [] r = new Object[actions.size()];
216     try {
217       table.batch(actions, r);
218       fail();
219     } catch (RetriesExhaustedWithDetailsException ex) {
220       LOG.debug(ex);
221       // good!
222       assertFalse(ex.mayHaveClusterIssues());
223     }
224     assertEquals(2, r.length);
225     assertTrue(r[0] instanceof Throwable);
226     assertTrue(r[1] instanceof Result);
227     table.close();
228   }
229 
230   @Test (timeout=300000)
231   public void testFlushCommitsNoAbort() throws Exception {
232     LOG.info("test=testFlushCommitsNoAbort");
233     doTestFlushCommits(false);
234   }
235 
236   /**
237    * Only run one Multi test with a forced RegionServer abort. Otherwise, the
238    * unit tests will take an unnecessarily long time to run.
239    *
240    * @throws Exception
241    */
242   @Test (timeout=300000)
243   public void testFlushCommitsWithAbort() throws Exception {
244     LOG.info("test=testFlushCommitsWithAbort");
245     doTestFlushCommits(true);
246   }
247 
248   /**
249    * Set table auto flush to false and test flushing commits
250    * @param doAbort true if abort one regionserver in the testing
251    * @throws Exception
252    */
253   private void doTestFlushCommits(boolean doAbort) throws Exception {
254     // Load the data
255     LOG.info("get new table");
256     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
257     table.setAutoFlush(false, true);
258     table.setWriteBufferSize(10 * 1024 * 1024);
259 
260     LOG.info("constructPutRequests");
261     List<Row> puts = constructPutRequests();
262     for (Row put : puts) {
263       table.put((Put) put);
264     }
265     LOG.info("puts");
266     table.flushCommits();
267     final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
268         .size();
269     assert liveRScount > 0;
270     JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
271         .getLiveRegionServerThreads().get(0);
272     if (doAbort) {
273       liveRS.getRegionServer().abort("Aborting for tests",
274           new Exception("doTestFlushCommits"));
275       // If we wait for no regions being online after we abort the server, we
276       // could ensure the master has re-assigned the regions on killed server
277       // after writing successfully. It means the server we aborted is dead
278       // and detected by matser
279       while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
280         Thread.sleep(10);
281       }
282       // try putting more keys after the abort. same key/qual... just validating
283       // no exceptions thrown
284       puts = constructPutRequests();
285       for (Row put : puts) {
286         table.put((Put) put);
287       }
288 
289       table.flushCommits();
290     }
291 
292     LOG.info("validating loaded data");
293     validateLoadedData(table);
294 
295     // Validate server and region count
296     List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
297     int count = 0;
298     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
299       count++;
300       LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
301     }
302     LOG.info("Count=" + count);
303     Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
304         (doAbort ? (liveRScount - 1) : liveRScount), count);
305     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
306       int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
307       // Assert.assertTrue("Count of regions=" + regions, regions > 10);
308     }
309     if (doAbort) {
310       UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
311       UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
312         @Override
313         public boolean evaluate() throws Exception {
314           return UTIL.getMiniHBaseCluster().getMaster()
315               .getClusterStatus().getServersSize() == (liveRScount - 1);
316         }
317       });
318       UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
319     }
320 
321     table.close();
322     LOG.info("done");
323   }
324 
325   @Test (timeout=300000)
326   public void testBatchWithPut() throws Exception {
327     LOG.info("test=testBatchWithPut");
328     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
329 
330     // put multiple rows using a batch
331     List<Row> puts = constructPutRequests();
332 
333     Object[] results = table.batch(puts);
334     validateSizeAndEmpty(results, KEYS.length);
335 
336     if (true) {
337       int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
338       assert liveRScount > 0;
339       JVMClusterUtil.RegionServerThread liveRS =
340         UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
341       liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
342       puts = constructPutRequests();
343       try {
344         results = table.batch(puts);
345       } catch (RetriesExhaustedWithDetailsException ree) {
346         LOG.info(ree.getExhaustiveDescription());
347         throw ree;
348       } finally {
349         table.close();
350       }
351       validateSizeAndEmpty(results, KEYS.length);
352     }
353 
354     validateLoadedData(table);
355     table.close();
356   }
357 
358   @Test(timeout=300000)
359   public void testBatchWithDelete() throws Exception {
360     LOG.info("test=testBatchWithDelete");
361     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
362 
363     // Load some data
364     List<Row> puts = constructPutRequests();
365     Object[] results = table.batch(puts);
366     validateSizeAndEmpty(results, KEYS.length);
367 
368     // Deletes
369     List<Row> deletes = new ArrayList<Row>();
370     for (int i = 0; i < KEYS.length; i++) {
371       Delete delete = new Delete(KEYS[i]);
372       delete.deleteFamily(BYTES_FAMILY);
373       deletes.add(delete);
374     }
375     results = table.batch(deletes);
376     validateSizeAndEmpty(results, KEYS.length);
377 
378     // Get to make sure ...
379     for (byte[] k : KEYS) {
380       Get get = new Get(k);
381       get.addColumn(BYTES_FAMILY, QUALIFIER);
382       Assert.assertFalse(table.exists(get));
383     }
384     table.close();
385   }
386 
387   @Test(timeout=300000)
388   public void testHTableDeleteWithList() throws Exception {
389     LOG.info("test=testHTableDeleteWithList");
390     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
391 
392     // Load some data
393     List<Row> puts = constructPutRequests();
394     Object[] results = table.batch(puts);
395     validateSizeAndEmpty(results, KEYS.length);
396 
397     // Deletes
398     ArrayList<Delete> deletes = new ArrayList<Delete>();
399     for (int i = 0; i < KEYS.length; i++) {
400       Delete delete = new Delete(KEYS[i]);
401       delete.deleteFamily(BYTES_FAMILY);
402       deletes.add(delete);
403     }
404     table.delete(deletes);
405     Assert.assertTrue(deletes.isEmpty());
406 
407     // Get to make sure ...
408     for (byte[] k : KEYS) {
409       Get get = new Get(k);
410       get.addColumn(BYTES_FAMILY, QUALIFIER);
411       Assert.assertFalse(table.exists(get));
412     }
413     table.close();
414   }
415 
416   @Test(timeout=300000)
417   public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
418     LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
419     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
420 
421     List<Row> puts = new ArrayList<Row>();
422     for (int i = 0; i < 100; i++) {
423       Put put = new Put(ONE_ROW);
424       byte[] qual = Bytes.toBytes("column" + i);
425       put.add(BYTES_FAMILY, qual, VALUE);
426       puts.add(put);
427     }
428     Object[] results = table.batch(puts);
429 
430     // validate
431     validateSizeAndEmpty(results, 100);
432 
433     // get the data back and validate that it is correct
434     List<Row> gets = new ArrayList<Row>();
435     for (int i = 0; i < 100; i++) {
436       Get get = new Get(ONE_ROW);
437       byte[] qual = Bytes.toBytes("column" + i);
438       get.addColumn(BYTES_FAMILY, qual);
439       gets.add(get);
440     }
441 
442     Object[] multiRes = table.batch(gets);
443 
444     int idx = 0;
445     for (Object r : multiRes) {
446       byte[] qual = Bytes.toBytes("column" + idx);
447       validateResult(r, qual, VALUE);
448       idx++;
449     }
450     table.close();
451   }
452 
453   @Test(timeout=300000)
454   public void testBatchWithIncrementAndAppend() throws Exception {
455     LOG.info("test=testBatchWithIncrementAndAppend");
456     final byte[] QUAL1 = Bytes.toBytes("qual1");
457     final byte[] QUAL2 = Bytes.toBytes("qual2");
458     final byte[] QUAL3 = Bytes.toBytes("qual3");
459     final byte[] QUAL4 = Bytes.toBytes("qual4");
460     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
461     Delete d = new Delete(ONE_ROW);
462     table.delete(d);
463     Put put = new Put(ONE_ROW);
464     put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
465     put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
466     table.put(put);
467 
468     Increment inc = new Increment(ONE_ROW);
469     inc.addColumn(BYTES_FAMILY, QUAL2, 1);
470     inc.addColumn(BYTES_FAMILY, QUAL3, 1);
471 
472     Append a = new Append(ONE_ROW);
473     a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
474     a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
475     List<Row> actions = new ArrayList<Row>();
476     actions.add(inc);
477     actions.add(a);
478 
479     Object[] multiRes = table.batch(actions);
480     validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
481     validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
482     validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
483     validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
484     table.close();
485   }
486 
487   @Test(timeout=300000)
488   public void testNonceCollision() throws Exception {
489     LOG.info("test=testNonceCollision");
490     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
491     Put put = new Put(ONE_ROW);
492     put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
493 
494     // Replace nonce manager with the one that returns each nonce twice.
495     NonceGenerator cnm = new PerClientRandomNonceGenerator() {
496       long lastNonce = -1;
497       @Override
498       public synchronized long newNonce() {
499         long nonce = 0;
500         if (lastNonce == -1) {
501           lastNonce = nonce = super.newNonce();
502         } else {
503           nonce = lastNonce;
504           lastNonce = -1L;
505         }
506         return nonce;
507       }
508     };
509     NonceGenerator oldCnm =
510         HConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), cnm);
511 
512     // First test sequential requests.
513     try {
514       Increment inc = new Increment(ONE_ROW);
515       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
516       table.increment(inc);
517       inc = new Increment(ONE_ROW);
518       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
519       try {
520         table.increment(inc);
521         fail("Should have thrown an exception");
522       } catch (OperationConflictException ex) {
523       }
524       Get get = new Get(ONE_ROW);
525       get.addColumn(BYTES_FAMILY, QUALIFIER);
526       Result result = table.get(get);
527       validateResult(result, QUALIFIER, Bytes.toBytes(1L));
528 
529       // Now run a bunch of requests in parallel, exactly half should succeed.
530       int numRequests = 40;
531       final CountDownLatch startedLatch = new CountDownLatch(numRequests);
532       final CountDownLatch startLatch = new CountDownLatch(1);
533       final CountDownLatch doneLatch = new CountDownLatch(numRequests);
534       for (int i = 0; i < numRequests; ++i) {
535         Runnable r = new Runnable() {
536           @Override
537           public void run() {
538             HTable table = null;
539             try {
540               table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
541             } catch (IOException e) {
542               fail("Not expected");
543             }
544             Increment inc = new Increment(ONE_ROW);
545             inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
546             startedLatch.countDown();
547             try {
548               startLatch.await();
549             } catch (InterruptedException e) {
550               fail("Not expected");
551             }
552             try {
553               table.increment(inc);
554             } catch (OperationConflictException ex) { // Some threads are expected to fail.
555             } catch (IOException ioEx) {
556               fail("Not expected");
557             }
558             doneLatch.countDown();
559           }
560         };
561         Threads.setDaemonThreadRunning(new Thread(r));
562       }
563       startedLatch.await(); // Wait until all threads are ready...
564       startLatch.countDown(); // ...and unleash the herd!
565       doneLatch.await();
566       // Now verify
567       get = new Get(ONE_ROW);
568       get.addColumn(BYTES_FAMILY, QUALIFIER);
569       result = table.get(get);
570       validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
571       table.close();
572     } finally {
573       HConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm);
574     }
575   }
576 
577   @Test(timeout=300000)
578   public void testBatchWithMixedActions() throws Exception {
579     LOG.info("test=testBatchWithMixedActions");
580     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
581 
582     // Load some data to start
583     Object[] results = table.batch(constructPutRequests());
584     validateSizeAndEmpty(results, KEYS.length);
585 
586     // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
587     // put
588     List<Row> actions = new ArrayList<Row>();
589 
590     byte[] qual2 = Bytes.toBytes("qual2");
591     byte[] val2 = Bytes.toBytes("putvalue2");
592 
593     // 0 get
594     Get get = new Get(KEYS[10]);
595     get.addColumn(BYTES_FAMILY, QUALIFIER);
596     actions.add(get);
597 
598     // 1 get
599     get = new Get(KEYS[11]);
600     get.addColumn(BYTES_FAMILY, QUALIFIER);
601     actions.add(get);
602 
603     // 2 put of new column
604     Put put = new Put(KEYS[10]);
605     put.add(BYTES_FAMILY, qual2, val2);
606     actions.add(put);
607 
608     // 3 delete
609     Delete delete = new Delete(KEYS[20]);
610     delete.deleteFamily(BYTES_FAMILY);
611     actions.add(delete);
612 
613     // 4 get
614     get = new Get(KEYS[30]);
615     get.addColumn(BYTES_FAMILY, QUALIFIER);
616     actions.add(get);
617 
618     // There used to be a 'get' of a previous put here, but removed
619     // since this API really cannot guarantee order in terms of mixed
620     // get/puts.
621 
622     // 5 put of new column
623     put = new Put(KEYS[40]);
624     put.add(BYTES_FAMILY, qual2, val2);
625     actions.add(put);
626 
627     results = table.batch(actions);
628 
629     // Validation
630 
631     validateResult(results[0]);
632     validateResult(results[1]);
633     validateEmpty(results[2]);
634     validateEmpty(results[3]);
635     validateResult(results[4]);
636     validateEmpty(results[5]);
637 
638     // validate last put, externally from the batch
639     get = new Get(KEYS[40]);
640     get.addColumn(BYTES_FAMILY, qual2);
641     Result r = table.get(get);
642     validateResult(r, qual2, val2);
643 
644     table.close();
645   }
646 
647   // // Helper methods ////
648 
649   private void validateResult(Object r) {
650     validateResult(r, QUALIFIER, VALUE);
651   }
652 
653   private void validateResult(Object r1, byte[] qual, byte[] val) {
654     Result r = (Result)r1;
655     Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
656     byte[] value = r.getValue(BYTES_FAMILY, qual);
657     if (0 != Bytes.compareTo(val, value)) {
658       fail("Expected [" + Bytes.toStringBinary(val)
659           + "] but got [" + Bytes.toStringBinary(value) + "]");
660     }
661   }
662 
663   private List<Row> constructPutRequests() {
664     List<Row> puts = new ArrayList<Row>();
665     for (byte[] k : KEYS) {
666       Put put = new Put(k);
667       put.add(BYTES_FAMILY, QUALIFIER, VALUE);
668       puts.add(put);
669     }
670     return puts;
671   }
672 
673   private void validateLoadedData(HTable table) throws IOException {
674     // get the data back and validate that it is correct
675     for (byte[] k : KEYS) {
676       Get get = new Get(k);
677       get.addColumn(BYTES_FAMILY, QUALIFIER);
678       Result r = table.get(get);
679       Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
680       Assert.assertEquals(0, Bytes.compareTo(VALUE, r
681           .getValue(BYTES_FAMILY, QUALIFIER)));
682     }
683   }
684 
685   private void validateEmpty(Object r1) {
686     Result result = (Result)r1;
687     Assert.assertTrue(result != null);
688     Assert.assertTrue(result.getRow() == null);
689     Assert.assertEquals(0, result.rawCells().length);
690   }
691 
692   private void validateSizeAndEmpty(Object[] results, int expectedSize) {
693     // Validate got back the same number of Result objects, all empty
694     Assert.assertEquals(expectedSize, results.length);
695     for (Object result : results) {
696       validateEmpty(result);
697     }
698   }
699 }