1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
22 import static org.junit.Assert.*;
23
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Chore;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.SmallTests;
31 import org.apache.hadoop.hbase.Stoppable;
32 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
34 import org.apache.hadoop.hbase.util.Threads;
35 import org.junit.Test;
36 import org.junit.experimental.categories.Category;
37 import org.mockito.Mockito;
38 import org.mockito.invocation.InvocationOnMock;
39 import org.mockito.stubbing.Answer;
40
41 @Category(SmallTests.class)
42 public class TestServerNonceManager {
43
44 @Test
45 public void testNormalStartEnd() throws Exception {
46 final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE };
47 ServerNonceManager nm = createManager();
48 for (int i = 0; i < numbers.length; ++i) {
49 for (int j = 0; j < numbers.length; ++j) {
50 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
51 }
52 }
53
54 for (int i = 0; i < numbers.length; ++i) {
55 assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable()));
56 }
57
58 for (int i = 0; i < numbers.length; ++i) {
59 for (int j = 0; j < numbers.length; ++j) {
60 nm.endOperation(numbers[i], numbers[j], false);
61 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
62 }
63 }
64
65 for (int i = 0; i < numbers.length; ++i) {
66 for (int j = 0; j < numbers.length; ++j) {
67 nm.endOperation(numbers[i], numbers[j], true);
68 assertEquals(numbers[j] == NO_NONCE,
69 nm.startOperation(numbers[i], numbers[j], createStoppable()));
70 }
71 }
72 }
73
74 @Test
75 public void testNoEndWithoutStart() {
76 ServerNonceManager nm = createManager();
77 try {
78 nm.endOperation(NO_NONCE, 1, true);
79 fail("Should have thrown");
80 } catch (AssertionError err) {}
81 }
82
83 @Test
84 public void testCleanup() throws Exception {
85 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
86 EnvironmentEdgeManager.injectEdge(edge);
87 try {
88 ServerNonceManager nm = createManager(6);
89 Chore cleanup = nm.createCleanupChore(Mockito.mock(Stoppable.class));
90 edge.setValue(1);
91 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
92 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
93 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
94 edge.setValue(2);
95 nm.endOperation(NO_NONCE, 1, true);
96 edge.setValue(4);
97 nm.endOperation(NO_NONCE, 2, true);
98 edge.setValue(9);
99 cleanup.choreForTesting();
100
101 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
102
103 assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable()));
104
105 nm.endOperation(NO_NONCE, 3, false);
106 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
107 edge.setValue(11);
108 cleanup.choreForTesting();
109
110 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
111 } finally {
112 EnvironmentEdgeManager.reset();
113 }
114 }
115
116 @Test
117 public void testWalNonces() throws Exception {
118 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
119 EnvironmentEdgeManager.injectEdge(edge);
120 try {
121 ServerNonceManager nm = createManager(6);
122 Chore cleanup = nm.createCleanupChore(Mockito.mock(Stoppable.class));
123
124 edge.setValue(12);
125 nm.reportOperationFromWal(NO_NONCE, 1, 8);
126 nm.reportOperationFromWal(NO_NONCE, 2, 2);
127 nm.reportOperationFromWal(NO_NONCE, 3, 5);
128 nm.reportOperationFromWal(NO_NONCE, 3, 6);
129
130 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
131
132 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
133 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
134
135 edge.setValue(17);
136 cleanup.choreForTesting();
137 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
138 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
139 edge.setValue(19);
140 cleanup.choreForTesting();
141 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
142 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
143 } finally {
144 EnvironmentEdgeManager.reset();
145 }
146 }
147
148 @Test
149 public void testConcurrentAttempts() throws Exception {
150 final ServerNonceManager nm = createManager();
151
152 nm.startOperation(NO_NONCE, 1, createStoppable());
153 TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable());
154 Thread t = tr.start();
155 waitForThreadToBlockOrExit(t);
156 nm.endOperation(NO_NONCE, 1, true);
157 t.join();
158 tr.propagateError();
159
160 nm.startOperation(NO_NONCE, 2, createStoppable());
161 tr = new TestRunnable(nm, 2, true, createStoppable());
162 t = tr.start();
163 waitForThreadToBlockOrExit(t);
164 nm.endOperation(NO_NONCE, 2, false);
165 t.join();
166 tr.propagateError();
167 nm.endOperation(NO_NONCE, 2, true);
168
169 nm.startOperation(NO_NONCE, 3, createStoppable());
170 tr = new TestRunnable(nm, 4, true, createStoppable());
171 tr.start().join();
172 tr.propagateError();
173 }
174
175 @Test
176 public void testStopWaiting() throws Exception {
177 final ServerNonceManager nm = createManager();
178 nm.setConflictWaitIterationMs(1);
179 Stoppable stoppingStoppable = createStoppable();
180 Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() {
181 AtomicInteger answer = new AtomicInteger(3);
182 @Override
183 public Boolean answer(InvocationOnMock invocation) throws Throwable {
184 return 0 < answer.decrementAndGet();
185 }
186 });
187
188 nm.startOperation(NO_NONCE, 1, createStoppable());
189 TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable);
190 Thread t = tr.start();
191 waitForThreadToBlockOrExit(t);
192
193 t.join();
194 tr.propagateError();
195 }
196
197 private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException {
198 for (int i = 9; i >= 0; --i) {
199 if (t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING
200 || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED) {
201 return;
202 }
203 if (i > 0) Thread.sleep(300);
204 }
205
206
207 }
208
209 private static class TestRunnable implements Runnable {
210 public final CountDownLatch startedLatch = new CountDownLatch(1);
211
212 private final ServerNonceManager nm;
213 private final long nonce;
214 private final Boolean expected;
215 private final Stoppable stoppable;
216
217 private Throwable throwable = null;
218
219 public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) {
220 this.nm = nm;
221 this.nonce = nonce;
222 this.expected = expected;
223 this.stoppable = stoppable;
224 }
225
226 public void propagateError() throws Exception {
227 if (throwable == null) return;
228 throw new Exception(throwable);
229 }
230
231 public Thread start() {
232 Thread t = new Thread(this);
233 t = Threads.setDaemonThreadRunning(t);
234 try {
235 startedLatch.await();
236 } catch (InterruptedException e) {
237 fail("Unexpected");
238 }
239 return t;
240 }
241
242 @Override
243 public void run() {
244 startedLatch.countDown();
245 boolean shouldThrow = expected == null;
246 boolean hasThrown = true;
247 try {
248 boolean result = nm.startOperation(NO_NONCE, nonce, stoppable);
249 hasThrown = false;
250 if (!shouldThrow) {
251 assertEquals(expected.booleanValue(), result);
252 }
253 } catch (Throwable t) {
254 if (!shouldThrow) {
255 throwable = t;
256 }
257 }
258 if (shouldThrow && !hasThrown) {
259 throwable = new AssertionError("Should have thrown");
260 }
261 }
262 }
263
264 private Stoppable createStoppable() {
265 Stoppable s = Mockito.mock(Stoppable.class);
266 Mockito.when(s.isStopped()).thenReturn(false);
267 return s;
268 }
269
270 private ServerNonceManager createManager() {
271 return createManager(null);
272 }
273
274 private ServerNonceManager createManager(Integer gracePeriod) {
275 Configuration conf = HBaseConfiguration.create();
276 if (gracePeriod != null) {
277 conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue());
278 }
279 return new ServerNonceManager(conf);
280 }
281 }