View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNull;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.List;
29  import java.util.Random;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.MediumTests;
47  import org.apache.hadoop.hbase.MultithreadedTestUtil;
48  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
49  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.client.Append;
52  import org.apache.hadoop.hbase.client.Delete;
53  import org.apache.hadoop.hbase.client.Durability;
54  import org.apache.hadoop.hbase.client.Get;
55  import org.apache.hadoop.hbase.client.Increment;
56  import org.apache.hadoop.hbase.client.Mutation;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.client.Result;
59  import org.apache.hadoop.hbase.client.RowMutations;
60  import org.apache.hadoop.hbase.client.Scan;
61  import org.apache.hadoop.hbase.filter.BinaryComparator;
62  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
63  import org.apache.hadoop.hbase.io.HeapSize;
64  import org.apache.hadoop.hbase.regionserver.wal.HLog;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.junit.After;
67  import org.junit.Before;
68  import org.junit.Rule;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  import org.junit.rules.TestName;
72  
73  
74  /**
75   * Testing of HRegion.incrementColumnValue, HRegion.increment,
76   * and HRegion.append
77   */
78  @Category(MediumTests.class) // Starts 100 threads
79  public class TestAtomicOperation {
80    static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
81    @Rule public TestName name = new TestName();
82  
83    HRegion region = null;
84    private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
85  
86    // Test names
87    static  byte[] tableName;
88    static final byte[] qual1 = Bytes.toBytes("qual1");
89    static final byte[] qual2 = Bytes.toBytes("qual2");
90    static final byte[] qual3 = Bytes.toBytes("qual3");
91    static final byte[] value1 = Bytes.toBytes("value1");
92    static final byte[] value2 = Bytes.toBytes("value2");
93    static final byte [] row = Bytes.toBytes("rowA");
94    static final byte [] row2 = Bytes.toBytes("rowB");
95  
96    @Before 
97    public void setup() {
98      tableName = Bytes.toBytes(name.getMethodName());
99    }
100   
101   @After
102   public void teardown() throws IOException {
103     if (region != null) {
104       region.close();
105       region = null;
106     }
107   }
108   //////////////////////////////////////////////////////////////////////////////
109   // New tests that doesn't spin up a mini cluster but rather just test the
110   // individual code pieces in the HRegion. 
111   //////////////////////////////////////////////////////////////////////////////
112 
113   /**
114    * Test basic append operation.
115    * More tests in
116    * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
117    */
118   @Test
119   public void testAppend() throws IOException {
120     initHRegion(tableName, name.getMethodName(), fam1);
121     String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
122     " The Universe, and Everything";
123     String v2 = " is... 42.";
124     Append a = new Append(row);
125     a.setReturnResults(false);
126     a.add(fam1, qual1, Bytes.toBytes(v1));
127     a.add(fam1, qual2, Bytes.toBytes(v2));
128     assertNull(region.append(a));
129     a = new Append(row);
130     a.add(fam1, qual1, Bytes.toBytes(v2));
131     a.add(fam1, qual2, Bytes.toBytes(v1));
132     Result result = region.append(a);
133     assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
134     assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
135   }
136 
137   /**
138    * Test multi-threaded increments.
139    */
140   @Test
141   public void testIncrementMultiThreads() throws IOException {
142 
143     LOG.info("Starting test testIncrementMultiThreads");
144     // run a with mixed column families (1 and 3 versions)
145     initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
146 
147     // create 100 threads, each will increment by its own quantity
148     int numThreads = 100;
149     int incrementsPerThread = 1000;
150     Incrementer[] all = new Incrementer[numThreads];
151     int expectedTotal = 0;
152 
153     // create all threads
154     for (int i = 0; i < numThreads; i++) {
155       all[i] = new Incrementer(region, i, i, incrementsPerThread);
156       expectedTotal += (i * incrementsPerThread);
157     }
158 
159     // run all threads
160     for (int i = 0; i < numThreads; i++) {
161       all[i].start();
162     }
163 
164     // wait for all threads to finish
165     for (int i = 0; i < numThreads; i++) {
166       try {
167         all[i].join();
168       } catch (InterruptedException e) {
169       }
170     }
171     assertICV(row, fam1, qual1, expectedTotal);
172     assertICV(row, fam1, qual2, expectedTotal*2);
173     assertICV(row, fam2, qual3, expectedTotal*3);
174     LOG.info("testIncrementMultiThreads successfully verified that total is " +
175              expectedTotal);
176   }
177 
178 
179   private void assertICV(byte [] row,
180                          byte [] familiy,
181                          byte[] qualifier,
182                          long amount) throws IOException {
183     // run a get and see?
184     Get get = new Get(row);
185     get.addColumn(familiy, qualifier);
186     Result result = region.get(get);
187     assertEquals(1, result.size());
188 
189     Cell kv = result.rawCells()[0];
190     long r = Bytes.toLong(CellUtil.cloneValue(kv));
191     assertEquals(amount, r);
192   }
193 
194   private void initHRegion (byte [] tableName, String callingMethod,
195       byte[] ... families)
196     throws IOException {
197     initHRegion(tableName, callingMethod, null, families);
198   }
199 
200   private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
201     byte[] ... families)
202   throws IOException {
203     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
204     int i=0;
205     for(byte [] family : families) {
206       HColumnDescriptor hcd = new HColumnDescriptor(family);
207       hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
208       htd.addFamily(hcd);
209     }
210     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
211     region = TEST_UTIL.createLocalHRegion(info, htd);
212   }
213 
214   /**
215    * A thread that makes a few increment calls
216    */
217   public static class Incrementer extends Thread {
218 
219     private final HRegion region;
220     private final int numIncrements;
221     private final int amount;
222 
223 
224     public Incrementer(HRegion region,
225         int threadNumber, int amount, int numIncrements) {
226       this.region = region;
227       this.numIncrements = numIncrements;
228       this.amount = amount;
229       setDaemon(true);
230     }
231 
232     @Override
233     public void run() {
234       for (int i=0; i<numIncrements; i++) {
235         try {
236           Increment inc = new Increment(row);
237           inc.addColumn(fam1, qual1, amount);
238           inc.addColumn(fam1, qual2, amount*2);
239           inc.addColumn(fam2, qual3, amount*3);
240           region.increment(inc);
241 
242           // verify: Make sure we only see completed increments
243           Get g = new Get(row);
244           Result result = region.get(g);
245           assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); 
246           assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3)));
247         } catch (IOException e) {
248           e.printStackTrace();
249         }
250       }
251     }
252   }
253 
254   @Test
255   public void testAppendMultiThreads() throws IOException {
256     LOG.info("Starting test testAppendMultiThreads");
257     // run a with mixed column families (1 and 3 versions)
258     initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
259 
260     int numThreads = 100;
261     int opsPerThread = 100;
262     AtomicOperation[] all = new AtomicOperation[numThreads];
263     final byte[] val = new byte[]{1};
264 
265     AtomicInteger failures = new AtomicInteger(0);
266     // create all threads
267     for (int i = 0; i < numThreads; i++) {
268       all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
269         @Override
270         public void run() {
271           for (int i=0; i<numOps; i++) {
272             try {
273               Append a = new Append(row);
274               a.add(fam1, qual1, val);
275               a.add(fam1, qual2, val);
276               a.add(fam2, qual3, val);
277               region.append(a);
278 
279               Get g = new Get(row);
280               Result result = region.get(g);
281               assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); 
282               assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); 
283             } catch (IOException e) {
284               e.printStackTrace();
285               failures.incrementAndGet();
286               fail();
287             }
288           }
289         }
290       };
291     }
292 
293     // run all threads
294     for (int i = 0; i < numThreads; i++) {
295       all[i].start();
296     }
297 
298     // wait for all threads to finish
299     for (int i = 0; i < numThreads; i++) {
300       try {
301         all[i].join();
302       } catch (InterruptedException e) {
303       }
304     }
305     assertEquals(0, failures.get());
306     Get g = new Get(row);
307     Result result = region.get(g);
308     assertEquals(result.getValue(fam1, qual1).length, 10000);
309     assertEquals(result.getValue(fam1, qual2).length, 10000);
310     assertEquals(result.getValue(fam2, qual3).length, 10000);
311   }
312   /**
313    * Test multi-threaded row mutations.
314    */
315   @Test
316   public void testRowMutationMultiThreads() throws IOException {
317 
318     LOG.info("Starting test testRowMutationMultiThreads");
319     initHRegion(tableName, name.getMethodName(), fam1);
320 
321     // create 10 threads, each will alternate between adding and
322     // removing a column
323     int numThreads = 10;
324     int opsPerThread = 500;
325     AtomicOperation[] all = new AtomicOperation[numThreads];
326 
327     AtomicLong timeStamps = new AtomicLong(0);
328     AtomicInteger failures = new AtomicInteger(0);
329     // create all threads
330     for (int i = 0; i < numThreads; i++) {
331       all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
332         @Override
333         public void run() {
334           boolean op = true;
335           for (int i=0; i<numOps; i++) {
336             try {
337               // throw in some flushes
338               if (i%10==0) {
339                 synchronized(region) {
340                   LOG.debug("flushing");
341                   region.flushcache();
342                   if (i%100==0) {
343                     region.compactStores();
344                   }
345                 }
346               }
347               long ts = timeStamps.incrementAndGet();
348               RowMutations rm = new RowMutations(row);
349               if (op) {
350                 Put p = new Put(row, ts);
351                 p.add(fam1, qual1, value1);
352                 rm.add(p);
353                 Delete d = new Delete(row);
354                 d.deleteColumns(fam1, qual2, ts);
355                 rm.add(d);
356               } else {
357                 Delete d = new Delete(row);
358                 d.deleteColumns(fam1, qual1, ts);
359                 rm.add(d);
360                 Put p = new Put(row, ts);
361                 p.add(fam1, qual2, value2);
362                 rm.add(p);
363               }
364               region.mutateRow(rm);
365               op ^= true;
366               // check: should always see exactly one column
367               Get g = new Get(row);
368               Result r = region.get(g);
369               if (r.size() != 1) {
370                 LOG.debug(r);
371                 failures.incrementAndGet();
372                 fail();
373               }
374             } catch (IOException e) {
375               e.printStackTrace();
376               failures.incrementAndGet();
377               fail();
378             }
379           }
380         }
381       };
382     }
383 
384     // run all threads
385     for (int i = 0; i < numThreads; i++) {
386       all[i].start();
387     }
388 
389     // wait for all threads to finish
390     for (int i = 0; i < numThreads; i++) {
391       try {
392         all[i].join();
393       } catch (InterruptedException e) {
394       }
395     }
396     assertEquals(0, failures.get());
397   }
398 
399 
400   /**
401    * Test multi-threaded region mutations.
402    */
403   @Test
404   public void testMultiRowMutationMultiThreads() throws IOException {
405 
406     LOG.info("Starting test testMultiRowMutationMultiThreads");
407     initHRegion(tableName, name.getMethodName(), fam1);
408 
409     // create 10 threads, each will alternate between adding and
410     // removing a column
411     int numThreads = 10;
412     int opsPerThread = 500;
413     AtomicOperation[] all = new AtomicOperation[numThreads];
414 
415     AtomicLong timeStamps = new AtomicLong(0);
416     AtomicInteger failures = new AtomicInteger(0);
417     final List<byte[]> rowsToLock = Arrays.asList(row, row2);
418     // create all threads
419     for (int i = 0; i < numThreads; i++) {
420       all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
421         @Override
422         public void run() {
423           boolean op = true;
424           for (int i=0; i<numOps; i++) {
425             try {
426               // throw in some flushes
427               if (i%10==0) {
428                 synchronized(region) {
429                   LOG.debug("flushing");
430                   region.flushcache();
431                   if (i%100==0) {
432                     region.compactStores();
433                   }
434                 }
435               }
436               long ts = timeStamps.incrementAndGet();
437               List<Mutation> mrm = new ArrayList<Mutation>();
438               if (op) {
439                 Put p = new Put(row2, ts);
440                 p.add(fam1, qual1, value1);
441                 mrm.add(p);
442                 Delete d = new Delete(row);
443                 d.deleteColumns(fam1, qual1, ts);
444                 mrm.add(d);
445               } else {
446                 Delete d = new Delete(row2);
447                 d.deleteColumns(fam1, qual1, ts);
448                 mrm.add(d);
449                 Put p = new Put(row, ts);
450                 p.add(fam1, qual1, value2);
451                 mrm.add(p);
452               }
453               region.mutateRowsWithLocks(mrm, rowsToLock);
454               op ^= true;
455               // check: should always see exactly one column
456               Scan s = new Scan(row);
457               RegionScanner rs = region.getScanner(s);
458               List<Cell> r = new ArrayList<Cell>();
459               while(rs.next(r));
460               rs.close();
461               if (r.size() != 1) {
462                 LOG.debug(r);
463                 failures.incrementAndGet();
464                 fail();
465               }
466             } catch (IOException e) {
467               e.printStackTrace();
468               failures.incrementAndGet();
469               fail();
470             }
471           }
472         }
473       };
474     }
475 
476     // run all threads
477     for (int i = 0; i < numThreads; i++) {
478       all[i].start();
479     }
480 
481     // wait for all threads to finish
482     for (int i = 0; i < numThreads; i++) {
483       try {
484         all[i].join();
485       } catch (InterruptedException e) {
486       }
487     }
488     assertEquals(0, failures.get());
489   }
490 
491   public static class AtomicOperation extends Thread {
492     protected final HRegion region;
493     protected final int numOps;
494     protected final AtomicLong timeStamps;
495     protected final AtomicInteger failures;
496     protected final Random r = new Random();
497 
498     public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
499         AtomicInteger failures) {
500       this.region = region;
501       this.numOps = numOps;
502       this.timeStamps = timeStamps;
503       this.failures = failures;
504     }
505   }
506   
507   private static CountDownLatch latch = new CountDownLatch(1);
508   private enum TestStep {
509     INIT,                  // initial put of 10 to set value of the cell
510     PUT_STARTED,           // began doing a put of 50 to cell
511     PUT_COMPLETED,         // put complete (released RowLock, but may not have advanced MVCC).
512     CHECKANDPUT_STARTED,   // began checkAndPut: if 10 -> 11
513     CHECKANDPUT_COMPLETED  // completed checkAndPut
514     // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
515   }
516   private static volatile TestStep testStep = TestStep.INIT;
517   private final String family = "f1";
518      
519   /**
520    * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
521    * MVCC. 
522    * 
523    * Moved into TestAtomicOperation from its original location, TestHBase7051
524    */
525   @Test
526   public void testPutAndCheckAndPutInParallel() throws Exception {
527 
528     final String tableName = "testPutAndCheckAndPut";
529     Configuration conf = TEST_UTIL.getConfiguration();
530     conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
531     final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
532         null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
533 
534     Put[] puts = new Put[1];
535     Put put = new Put(Bytes.toBytes("r1"));
536     put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
537     puts[0] = put;
538     
539     region.batchMutate(puts);
540     MultithreadedTestUtil.TestContext ctx =
541       new MultithreadedTestUtil.TestContext(conf);
542     ctx.addThread(new PutThread(ctx, region));
543     ctx.addThread(new CheckAndPutThread(ctx, region));
544     ctx.startThreads();
545     while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
546       Thread.sleep(100);
547     }
548     ctx.stop();
549     Scan s = new Scan();
550     RegionScanner scanner = region.getScanner(s);
551     List<Cell> results = new ArrayList<Cell>();
552     scanner.next(results, 2);
553     for (Cell keyValue : results) {
554       assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
555     }
556 
557   }
558 
559   private class PutThread extends TestThread {
560     private MockHRegion region;
561     PutThread(TestContext ctx, MockHRegion region) {
562       super(ctx);
563       this.region = region;
564     }
565 
566     public void doWork() throws Exception {
567       Put[] puts = new Put[1];
568       Put put = new Put(Bytes.toBytes("r1"));
569       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
570       puts[0] = put;
571       testStep = TestStep.PUT_STARTED;
572       region.batchMutate(puts);
573     }
574   }
575 
576   private class CheckAndPutThread extends TestThread {
577     private MockHRegion region;
578     CheckAndPutThread(TestContext ctx, MockHRegion region) {
579       super(ctx);
580       this.region = region;
581    }
582 
583     public void doWork() throws Exception {
584       Put[] puts = new Put[1];
585       Put put = new Put(Bytes.toBytes("r1"));
586       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
587       puts[0] = put;
588       while (testStep != TestStep.PUT_COMPLETED) {
589         Thread.sleep(100);
590       }
591       testStep = TestStep.CHECKANDPUT_STARTED;
592       region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
593         CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
594       testStep = TestStep.CHECKANDPUT_COMPLETED;
595     }
596   }
597 
598   public static class MockHRegion extends HRegion {
599 
600     public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
601         final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
602       super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
603     }
604 
605     @Override
606     public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
607       if (testStep == TestStep.CHECKANDPUT_STARTED) {
608         latch.countDown();
609       }
610       return new WrappedRowLock(super.getRowLock(row, waitForLock));
611     }
612     
613     public class WrappedRowLock extends RowLock {
614 
615       private WrappedRowLock(RowLock rowLock) {
616         super(rowLock.context);
617       }
618 
619       @Override
620       public void release() {
621         if (testStep == TestStep.INIT) {
622           super.release();
623           return;
624         }
625 
626         if (testStep == TestStep.PUT_STARTED) {
627           try {
628             testStep = TestStep.PUT_COMPLETED;
629             super.release();
630             // put has been written to the memstore and the row lock has been released, but the
631             // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
632             // operations would cause the non-atomicity to show up:
633             // 1) Put releases row lock (where we are now)
634             // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
635             //    because the MVCC has not advanced
636             // 3) Put advances MVCC
637             // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
638             // (see below), and then wait some more to give the checkAndPut time to read the old
639             // value.
640             latch.await();
641             Thread.sleep(1000);
642           } catch (InterruptedException e) {
643             Thread.currentThread().interrupt();
644           }
645         }
646         else if (testStep == TestStep.CHECKANDPUT_STARTED) {
647           super.release();
648         }
649       }
650     }
651   }
652 }