1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import junit.framework.TestCase;
21 import org.apache.hadoop.hbase.SmallTests;
22 import org.junit.experimental.categories.Category;
23
24 import java.util.Random;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicLong;
27
28
29
30
31
32 @Category(SmallTests.class)
33 public class TestMultiVersionConsistencyControl extends TestCase {
34 static class Writer implements Runnable {
35 final AtomicBoolean finished;
36 final MultiVersionConsistencyControl mvcc;
37 final AtomicBoolean status;
38
39 Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
40 this.finished = finished;
41 this.mvcc = mvcc;
42 this.status = status;
43 }
44
45 private Random rnd = new Random();
46 public boolean failed = false;
47
48 public void run() {
49 while (!finished.get()) {
50 MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert();
51
52
53 int sleepTime = rnd.nextInt(500);
54
55
56 try {
57 if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
58 } catch (InterruptedException e1) {
59 }
60 try {
61 mvcc.completeMemstoreInsert(e);
62 } catch (RuntimeException ex) {
63
64 System.out.println(ex.toString());
65 ex.printStackTrace();
66 status.set(false);
67 return;
68
69 }
70 }
71 }
72 }
73
74 public void testParallelism() throws Exception {
75 final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
76
77 final AtomicBoolean finished = new AtomicBoolean(false);
78
79
80 final AtomicBoolean readerFailed = new AtomicBoolean(false);
81 final AtomicLong failedAt = new AtomicLong();
82 Runnable reader = new Runnable() {
83 public void run() {
84 long prev = mvcc.memstoreReadPoint();
85 while (!finished.get()) {
86 long newPrev = mvcc.memstoreReadPoint();
87 if (newPrev < prev) {
88
89 System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
90 readerFailed.set(true);
91
92 failedAt.set(newPrev);
93 return;
94 }
95 }
96 }
97 };
98
99
100 int n = 20;
101 Thread[] writers = new Thread[n];
102 AtomicBoolean[] statuses = new AtomicBoolean[n];
103 Thread readThread = new Thread(reader);
104
105 for (int i = 0; i < n; ++i) {
106 statuses[i] = new AtomicBoolean(true);
107 writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
108 writers[i].start();
109 }
110 readThread.start();
111
112 try {
113 Thread.sleep(10 * 1000);
114 } catch (InterruptedException ex) {
115 }
116
117 finished.set(true);
118
119 readThread.join();
120 for (int i = 0; i < n; ++i) {
121 writers[i].join();
122 }
123
124
125 assertFalse(readerFailed.get());
126 for (int i = 0; i < n; ++i) {
127 assertTrue(statuses[i].get());
128 }
129
130 }
131
132 }