1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.fail;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.MediumTests;
47 import org.apache.hadoop.hbase.MultithreadedTestUtil;
48 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
49 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.client.Append;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Durability;
54 import org.apache.hadoop.hbase.client.Get;
55 import org.apache.hadoop.hbase.client.Increment;
56 import org.apache.hadoop.hbase.client.Mutation;
57 import org.apache.hadoop.hbase.client.Put;
58 import org.apache.hadoop.hbase.client.Result;
59 import org.apache.hadoop.hbase.client.RowMutations;
60 import org.apache.hadoop.hbase.client.Scan;
61 import org.apache.hadoop.hbase.filter.BinaryComparator;
62 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
63 import org.apache.hadoop.hbase.io.HeapSize;
64 import org.apache.hadoop.hbase.regionserver.wal.HLog;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.junit.After;
67 import org.junit.Before;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72
73
74
75
76
77
78 @Category(MediumTests.class)
79 public class TestAtomicOperation {
80 static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
81 @Rule public TestName name = new TestName();
82
83 HRegion region = null;
84 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
85
86
87 static byte[] tableName;
88 static final byte[] qual1 = Bytes.toBytes("qual1");
89 static final byte[] qual2 = Bytes.toBytes("qual2");
90 static final byte[] qual3 = Bytes.toBytes("qual3");
91 static final byte[] value1 = Bytes.toBytes("value1");
92 static final byte[] value2 = Bytes.toBytes("value2");
93 static final byte [] row = Bytes.toBytes("rowA");
94 static final byte [] row2 = Bytes.toBytes("rowB");
95
96 @Before
97 public void setup() {
98 tableName = Bytes.toBytes(name.getMethodName());
99 }
100
101 @After
102 public void teardown() throws IOException {
103 if (region != null) {
104 region.close();
105 region = null;
106 }
107 }
108
109
110
111
112
113
114
115
116
117
118 @Test
119 public void testAppend() throws IOException {
120 initHRegion(tableName, name.getMethodName(), fam1);
121 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
122 " The Universe, and Everything";
123 String v2 = " is... 42.";
124 Append a = new Append(row);
125 a.setReturnResults(false);
126 a.add(fam1, qual1, Bytes.toBytes(v1));
127 a.add(fam1, qual2, Bytes.toBytes(v2));
128 assertNull(region.append(a));
129 a = new Append(row);
130 a.add(fam1, qual1, Bytes.toBytes(v2));
131 a.add(fam1, qual2, Bytes.toBytes(v1));
132 Result result = region.append(a);
133 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
134 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
135 }
136
137
138
139
140 @Test
141 public void testIncrementMultiThreads() throws IOException {
142
143 LOG.info("Starting test testIncrementMultiThreads");
144
145 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
146
147
148 int numThreads = 100;
149 int incrementsPerThread = 1000;
150 Incrementer[] all = new Incrementer[numThreads];
151 int expectedTotal = 0;
152
153
154 for (int i = 0; i < numThreads; i++) {
155 all[i] = new Incrementer(region, i, i, incrementsPerThread);
156 expectedTotal += (i * incrementsPerThread);
157 }
158
159
160 for (int i = 0; i < numThreads; i++) {
161 all[i].start();
162 }
163
164
165 for (int i = 0; i < numThreads; i++) {
166 try {
167 all[i].join();
168 } catch (InterruptedException e) {
169 }
170 }
171 assertICV(row, fam1, qual1, expectedTotal);
172 assertICV(row, fam1, qual2, expectedTotal*2);
173 assertICV(row, fam2, qual3, expectedTotal*3);
174 LOG.info("testIncrementMultiThreads successfully verified that total is " +
175 expectedTotal);
176 }
177
178
179 private void assertICV(byte [] row,
180 byte [] familiy,
181 byte[] qualifier,
182 long amount) throws IOException {
183
184 Get get = new Get(row);
185 get.addColumn(familiy, qualifier);
186 Result result = region.get(get);
187 assertEquals(1, result.size());
188
189 Cell kv = result.rawCells()[0];
190 long r = Bytes.toLong(CellUtil.cloneValue(kv));
191 assertEquals(amount, r);
192 }
193
194 private void initHRegion (byte [] tableName, String callingMethod,
195 byte[] ... families)
196 throws IOException {
197 initHRegion(tableName, callingMethod, null, families);
198 }
199
200 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
201 byte[] ... families)
202 throws IOException {
203 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
204 int i=0;
205 for(byte [] family : families) {
206 HColumnDescriptor hcd = new HColumnDescriptor(family);
207 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
208 htd.addFamily(hcd);
209 }
210 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
211 region = TEST_UTIL.createLocalHRegion(info, htd);
212 }
213
214
215
216
217 public static class Incrementer extends Thread {
218
219 private final HRegion region;
220 private final int numIncrements;
221 private final int amount;
222
223
224 public Incrementer(HRegion region,
225 int threadNumber, int amount, int numIncrements) {
226 this.region = region;
227 this.numIncrements = numIncrements;
228 this.amount = amount;
229 setDaemon(true);
230 }
231
232 @Override
233 public void run() {
234 for (int i=0; i<numIncrements; i++) {
235 try {
236 Increment inc = new Increment(row);
237 inc.addColumn(fam1, qual1, amount);
238 inc.addColumn(fam1, qual2, amount*2);
239 inc.addColumn(fam2, qual3, amount*3);
240 region.increment(inc);
241
242
243 Get g = new Get(row);
244 Result result = region.get(g);
245 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2)));
246 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3)));
247 } catch (IOException e) {
248 e.printStackTrace();
249 }
250 }
251 }
252 }
253
254 @Test
255 public void testAppendMultiThreads() throws IOException {
256 LOG.info("Starting test testAppendMultiThreads");
257
258 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
259
260 int numThreads = 100;
261 int opsPerThread = 100;
262 AtomicOperation[] all = new AtomicOperation[numThreads];
263 final byte[] val = new byte[]{1};
264
265 AtomicInteger failures = new AtomicInteger(0);
266
267 for (int i = 0; i < numThreads; i++) {
268 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
269 @Override
270 public void run() {
271 for (int i=0; i<numOps; i++) {
272 try {
273 Append a = new Append(row);
274 a.add(fam1, qual1, val);
275 a.add(fam1, qual2, val);
276 a.add(fam2, qual3, val);
277 region.append(a);
278
279 Get g = new Get(row);
280 Result result = region.get(g);
281 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
282 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
283 } catch (IOException e) {
284 e.printStackTrace();
285 failures.incrementAndGet();
286 fail();
287 }
288 }
289 }
290 };
291 }
292
293
294 for (int i = 0; i < numThreads; i++) {
295 all[i].start();
296 }
297
298
299 for (int i = 0; i < numThreads; i++) {
300 try {
301 all[i].join();
302 } catch (InterruptedException e) {
303 }
304 }
305 assertEquals(0, failures.get());
306 Get g = new Get(row);
307 Result result = region.get(g);
308 assertEquals(result.getValue(fam1, qual1).length, 10000);
309 assertEquals(result.getValue(fam1, qual2).length, 10000);
310 assertEquals(result.getValue(fam2, qual3).length, 10000);
311 }
312
313
314
315 @Test
316 public void testRowMutationMultiThreads() throws IOException {
317
318 LOG.info("Starting test testRowMutationMultiThreads");
319 initHRegion(tableName, name.getMethodName(), fam1);
320
321
322
323 int numThreads = 10;
324 int opsPerThread = 500;
325 AtomicOperation[] all = new AtomicOperation[numThreads];
326
327 AtomicLong timeStamps = new AtomicLong(0);
328 AtomicInteger failures = new AtomicInteger(0);
329
330 for (int i = 0; i < numThreads; i++) {
331 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
332 @Override
333 public void run() {
334 boolean op = true;
335 for (int i=0; i<numOps; i++) {
336 try {
337
338 if (i%10==0) {
339 synchronized(region) {
340 LOG.debug("flushing");
341 region.flushcache();
342 if (i%100==0) {
343 region.compactStores();
344 }
345 }
346 }
347 long ts = timeStamps.incrementAndGet();
348 RowMutations rm = new RowMutations(row);
349 if (op) {
350 Put p = new Put(row, ts);
351 p.add(fam1, qual1, value1);
352 rm.add(p);
353 Delete d = new Delete(row);
354 d.deleteColumns(fam1, qual2, ts);
355 rm.add(d);
356 } else {
357 Delete d = new Delete(row);
358 d.deleteColumns(fam1, qual1, ts);
359 rm.add(d);
360 Put p = new Put(row, ts);
361 p.add(fam1, qual2, value2);
362 rm.add(p);
363 }
364 region.mutateRow(rm);
365 op ^= true;
366
367 Get g = new Get(row);
368 Result r = region.get(g);
369 if (r.size() != 1) {
370 LOG.debug(r);
371 failures.incrementAndGet();
372 fail();
373 }
374 } catch (IOException e) {
375 e.printStackTrace();
376 failures.incrementAndGet();
377 fail();
378 }
379 }
380 }
381 };
382 }
383
384
385 for (int i = 0; i < numThreads; i++) {
386 all[i].start();
387 }
388
389
390 for (int i = 0; i < numThreads; i++) {
391 try {
392 all[i].join();
393 } catch (InterruptedException e) {
394 }
395 }
396 assertEquals(0, failures.get());
397 }
398
399
400
401
402
403 @Test
404 public void testMultiRowMutationMultiThreads() throws IOException {
405
406 LOG.info("Starting test testMultiRowMutationMultiThreads");
407 initHRegion(tableName, name.getMethodName(), fam1);
408
409
410
411 int numThreads = 10;
412 int opsPerThread = 500;
413 AtomicOperation[] all = new AtomicOperation[numThreads];
414
415 AtomicLong timeStamps = new AtomicLong(0);
416 AtomicInteger failures = new AtomicInteger(0);
417 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
418
419 for (int i = 0; i < numThreads; i++) {
420 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
421 @Override
422 public void run() {
423 boolean op = true;
424 for (int i=0; i<numOps; i++) {
425 try {
426
427 if (i%10==0) {
428 synchronized(region) {
429 LOG.debug("flushing");
430 region.flushcache();
431 if (i%100==0) {
432 region.compactStores();
433 }
434 }
435 }
436 long ts = timeStamps.incrementAndGet();
437 List<Mutation> mrm = new ArrayList<Mutation>();
438 if (op) {
439 Put p = new Put(row2, ts);
440 p.add(fam1, qual1, value1);
441 mrm.add(p);
442 Delete d = new Delete(row);
443 d.deleteColumns(fam1, qual1, ts);
444 mrm.add(d);
445 } else {
446 Delete d = new Delete(row2);
447 d.deleteColumns(fam1, qual1, ts);
448 mrm.add(d);
449 Put p = new Put(row, ts);
450 p.add(fam1, qual1, value2);
451 mrm.add(p);
452 }
453 region.mutateRowsWithLocks(mrm, rowsToLock);
454 op ^= true;
455
456 Scan s = new Scan(row);
457 RegionScanner rs = region.getScanner(s);
458 List<Cell> r = new ArrayList<Cell>();
459 while(rs.next(r));
460 rs.close();
461 if (r.size() != 1) {
462 LOG.debug(r);
463 failures.incrementAndGet();
464 fail();
465 }
466 } catch (IOException e) {
467 e.printStackTrace();
468 failures.incrementAndGet();
469 fail();
470 }
471 }
472 }
473 };
474 }
475
476
477 for (int i = 0; i < numThreads; i++) {
478 all[i].start();
479 }
480
481
482 for (int i = 0; i < numThreads; i++) {
483 try {
484 all[i].join();
485 } catch (InterruptedException e) {
486 }
487 }
488 assertEquals(0, failures.get());
489 }
490
491 public static class AtomicOperation extends Thread {
492 protected final HRegion region;
493 protected final int numOps;
494 protected final AtomicLong timeStamps;
495 protected final AtomicInteger failures;
496 protected final Random r = new Random();
497
498 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
499 AtomicInteger failures) {
500 this.region = region;
501 this.numOps = numOps;
502 this.timeStamps = timeStamps;
503 this.failures = failures;
504 }
505 }
506
507 private static CountDownLatch latch = new CountDownLatch(1);
508 private enum TestStep {
509 INIT,
510 PUT_STARTED,
511 PUT_COMPLETED,
512 CHECKANDPUT_STARTED,
513 CHECKANDPUT_COMPLETED
514
515 }
516 private static volatile TestStep testStep = TestStep.INIT;
517 private final String family = "f1";
518
519
520
521
522
523
524
525 @Test
526 public void testPutAndCheckAndPutInParallel() throws Exception {
527
528 final String tableName = "testPutAndCheckAndPut";
529 Configuration conf = TEST_UTIL.getConfiguration();
530 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
531 final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
532 null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
533
534 Put[] puts = new Put[1];
535 Put put = new Put(Bytes.toBytes("r1"));
536 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
537 puts[0] = put;
538
539 region.batchMutate(puts);
540 MultithreadedTestUtil.TestContext ctx =
541 new MultithreadedTestUtil.TestContext(conf);
542 ctx.addThread(new PutThread(ctx, region));
543 ctx.addThread(new CheckAndPutThread(ctx, region));
544 ctx.startThreads();
545 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
546 Thread.sleep(100);
547 }
548 ctx.stop();
549 Scan s = new Scan();
550 RegionScanner scanner = region.getScanner(s);
551 List<Cell> results = new ArrayList<Cell>();
552 scanner.next(results, 2);
553 for (Cell keyValue : results) {
554 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
555 }
556
557 }
558
559 private class PutThread extends TestThread {
560 private MockHRegion region;
561 PutThread(TestContext ctx, MockHRegion region) {
562 super(ctx);
563 this.region = region;
564 }
565
566 public void doWork() throws Exception {
567 Put[] puts = new Put[1];
568 Put put = new Put(Bytes.toBytes("r1"));
569 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
570 puts[0] = put;
571 testStep = TestStep.PUT_STARTED;
572 region.batchMutate(puts);
573 }
574 }
575
576 private class CheckAndPutThread extends TestThread {
577 private MockHRegion region;
578 CheckAndPutThread(TestContext ctx, MockHRegion region) {
579 super(ctx);
580 this.region = region;
581 }
582
583 public void doWork() throws Exception {
584 Put[] puts = new Put[1];
585 Put put = new Put(Bytes.toBytes("r1"));
586 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
587 puts[0] = put;
588 while (testStep != TestStep.PUT_COMPLETED) {
589 Thread.sleep(100);
590 }
591 testStep = TestStep.CHECKANDPUT_STARTED;
592 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
593 CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
594 testStep = TestStep.CHECKANDPUT_COMPLETED;
595 }
596 }
597
598 public static class MockHRegion extends HRegion {
599
600 public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
601 final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
602 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
603 }
604
605 @Override
606 public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
607 if (testStep == TestStep.CHECKANDPUT_STARTED) {
608 latch.countDown();
609 }
610 return new WrappedRowLock(super.getRowLock(row, waitForLock));
611 }
612
613 public class WrappedRowLock extends RowLock {
614
615 private WrappedRowLock(RowLock rowLock) {
616 super(rowLock.context);
617 }
618
619 @Override
620 public void release() {
621 if (testStep == TestStep.INIT) {
622 super.release();
623 return;
624 }
625
626 if (testStep == TestStep.PUT_STARTED) {
627 try {
628 testStep = TestStep.PUT_COMPLETED;
629 super.release();
630
631
632
633
634
635
636
637
638
639
640 latch.await();
641 Thread.sleep(1000);
642 } catch (InterruptedException e) {
643 Thread.currentThread().interrupt();
644 }
645 }
646 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
647 super.release();
648 }
649 }
650 }
651 }
652 }