1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.Date;
22 import java.text.SimpleDateFormat;
23 import java.util.Arrays;
24 import java.util.Comparator;
25 import java.util.Map;
26 import java.util.UUID;
27 import java.util.concurrent.ConcurrentHashMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Chore;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.Stoppable;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37 import org.apache.hadoop.hbase.util.Pair;
38
39 import org.apache.hadoop.classification.InterfaceAudience;
40
41 import com.google.common.annotations.VisibleForTesting;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class ServerNonceManager {
49 public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod";
50 private static final Log LOG = LogFactory.getLog(ServerNonceManager.class);
51
52
53
54 private int conflictWaitIterationMs = 30000;
55
56 private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS");
57
58
59 private static class OperationContext {
60 static final int DONT_PROCEED = 0;
61 static final int PROCEED = 1;
62 static final int WAIT = 2;
63
64
65 private long data = 0;
66 private static final long STATE_BITS = 3;
67 private static final long WAITING_BIT = 4;
68 private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
69
70 @Override
71 public String toString() {
72 return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
73 + tsFormat.format(new Date(getActivityTime())) + "]";
74 }
75
76 public OperationContext() {
77 setState(WAIT);
78 reportActivity();
79 }
80
81 public void setState(int state) {
82 this.data = (this.data & ~STATE_BITS) | state;
83 }
84
85 public int getState() {
86 return (int)(this.data & STATE_BITS);
87 }
88
89 public void setHasWait() {
90 this.data = this.data | WAITING_BIT;
91 }
92
93 public boolean hasWait() {
94 return (this.data & WAITING_BIT) == WAITING_BIT;
95 }
96
97 public void reportActivity() {
98 long now = EnvironmentEdgeManager.currentTimeMillis();
99 this.data = (this.data & ALL_FLAG_BITS) | (now << 3);
100 }
101
102 public boolean isExpired(long minRelevantTime) {
103 return getActivityTime() < (minRelevantTime & (~0l >>> 3));
104 }
105
106 private long getActivityTime() {
107 return this.data >>> 3;
108 }
109 }
110
111
112
113
114
115 private static class NonceKey {
116 private long group;
117 private long nonce;
118
119 public NonceKey(long group, long nonce) {
120 assert nonce != HConstants.NO_NONCE;
121 this.group = group;
122 this.nonce = nonce;
123 }
124
125 @Override
126 public boolean equals(Object obj) {
127 if (obj == null || !(obj instanceof NonceKey)) return false;
128 NonceKey nk = ((NonceKey)obj);
129 return this.nonce == nk.nonce && this.group == nk.group;
130 }
131
132 @Override
133 public int hashCode() {
134 return (int)((group >> 32) ^ group ^ (nonce >> 32) ^ nonce);
135 }
136
137 @Override
138 public String toString() {
139 return "[" + group + ":" + nonce + "]";
140 }
141 }
142
143
144
145
146
147
148
149
150
151 private ConcurrentHashMap<NonceKey, OperationContext> nonces =
152 new ConcurrentHashMap<NonceKey, OperationContext>();
153
154 private int deleteNonceGracePeriod;
155
156 public ServerNonceManager(Configuration conf) {
157
158 deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000);
159 if (deleteNonceGracePeriod < 60 * 1000) {
160 LOG.warn("Nonce grace period " + deleteNonceGracePeriod
161 + " is less than a minute; might be too small to be useful");
162 }
163 }
164
165 @VisibleForTesting
166 public void setConflictWaitIterationMs(int conflictWaitIterationMs) {
167 this.conflictWaitIterationMs = conflictWaitIterationMs;
168 }
169
170
171
172
173
174
175
176
177
178 public boolean startOperation(long group, long nonce, Stoppable stoppable)
179 throws InterruptedException {
180 if (nonce == HConstants.NO_NONCE) return true;
181 NonceKey nk = new NonceKey(group, nonce);
182 OperationContext ctx = new OperationContext();
183 while (true) {
184 OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
185 if (oldResult == null) return true;
186
187
188 synchronized (oldResult) {
189 int oldState = oldResult.getState();
190 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
191 if (oldState != OperationContext.WAIT) {
192 return oldState == OperationContext.PROCEED;
193 }
194 oldResult.setHasWait();
195 oldResult.wait(this.conflictWaitIterationMs);
196 if (stoppable.isStopped()) {
197 throw new InterruptedException("Server stopped");
198 }
199 }
200 }
201 }
202
203
204
205
206
207
208
209 public void endOperation(long group, long nonce, boolean success) {
210 if (nonce == HConstants.NO_NONCE) return;
211 NonceKey nk = new NonceKey(group, nonce);
212 OperationContext newResult = nonces.get(nk);
213 assert newResult != null;
214 synchronized (newResult) {
215 assert newResult.getState() == OperationContext.WAIT;
216
217 newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED);
218 if (success) {
219 newResult.reportActivity();
220 } else {
221 OperationContext val = nonces.remove(nk);
222 assert val == newResult;
223 }
224 if (newResult.hasWait()) {
225 LOG.debug("Conflict with running op ended: " + nk + ", " + newResult);
226 newResult.notifyAll();
227 }
228 }
229 }
230
231
232
233
234
235
236
237 public void reportOperationFromWal(long group, long nonce, long writeTime) {
238 if (nonce == HConstants.NO_NONCE) return;
239
240 long now = EnvironmentEdgeManager.currentTimeMillis();
241 if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
242 OperationContext newResult = new OperationContext();
243 newResult.setState(OperationContext.DONT_PROCEED);
244 NonceKey nk = new NonceKey(group, nonce);
245 OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
246 if (oldResult != null) {
247
248
249 LOG.warn("Nonce collision during WAL recovery: " + nk
250 + ", " + oldResult + " with " + newResult);
251 }
252 }
253
254
255
256
257
258
259 public Chore createCleanupChore(Stoppable stoppable) {
260
261 return new Chore("nonceCleaner", deleteNonceGracePeriod / 5, stoppable) {
262 @Override
263 protected void chore() {
264 cleanUpOldNonces();
265 }
266 };
267 }
268
269 private void cleanUpOldNonces() {
270 long cutoff = EnvironmentEdgeManager.currentTimeMillis() - deleteNonceGracePeriod;
271 for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
272 OperationContext oc = entry.getValue();
273 if (!oc.isExpired(cutoff)) continue;
274 synchronized (oc) {
275 if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
276 nonces.remove(entry.getKey());
277 }
278 }
279 }
280 }