1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.util.HasThread;
25
26 import java.util.ConcurrentModificationException;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Delayed;
32 import java.util.concurrent.DelayQueue;
33 import java.util.concurrent.TimeUnit;
34
35 import java.io.IOException;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class Leases extends HasThread {
58 private static final Log LOG = LogFactory.getLog(Leases.class.getName());
59 public static final int MIN_WAIT_TIME = 100;
60 private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
61
62 protected final int leaseCheckFrequency;
63 protected volatile boolean stopRequested = false;
64
65
66
67
68
69
70
71 public Leases(final int leaseCheckFrequency) {
72 this.leaseCheckFrequency = leaseCheckFrequency;
73 setDaemon(true);
74 }
75
76
77
78
79 @Override
80 public void run() {
81 long toWait = leaseCheckFrequency;
82 Lease nextLease = null;
83 long nextLeaseDelay = Long.MAX_VALUE;
84
85 while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
86
87 try {
88 if (nextLease != null) {
89 toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
90 }
91
92 toWait = Math.min(leaseCheckFrequency, toWait);
93 toWait = Math.max(MIN_WAIT_TIME, toWait);
94
95 Thread.sleep(toWait);
96 } catch (InterruptedException e) {
97 continue;
98 } catch (ConcurrentModificationException e) {
99 continue;
100 } catch (Throwable e) {
101 LOG.fatal("Unexpected exception killed leases thread", e);
102 break;
103 }
104
105 nextLease = null;
106 nextLeaseDelay = Long.MAX_VALUE;
107 for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
108 Map.Entry<String, Lease> entry = it.next();
109 Lease lease = entry.getValue();
110 long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
111 if ( thisLeaseDelay > 0) {
112 if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
113 nextLease = lease;
114 nextLeaseDelay = thisLeaseDelay;
115 }
116 } else {
117
118
119 if (lease.getListener() == null) {
120 LOG.error("lease listener is null for lease " + lease.getLeaseName());
121 } else {
122 lease.getListener().leaseExpired();
123 }
124 it.remove();
125 }
126 }
127 }
128 close();
129 }
130
131
132
133
134
135
136
137
138 public void closeAfterLeasesExpire() {
139 this.stopRequested = true;
140 }
141
142
143
144
145
146 public void close() {
147 LOG.info(Thread.currentThread().getName() + " closing leases");
148 this.stopRequested = true;
149 leases.clear();
150 LOG.info(Thread.currentThread().getName() + " closed leases");
151 }
152
153
154
155
156
157
158
159
160
161 public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
162 throws LeaseStillHeldException {
163 addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
164 }
165
166
167
168
169
170
171 public void addLease(final Lease lease) throws LeaseStillHeldException {
172 if (this.stopRequested) {
173 return;
174 }
175 lease.resetExpirationTime();
176 if (leases.containsKey(lease.getLeaseName())) {
177 throw new LeaseStillHeldException(lease.getLeaseName());
178 }
179 leases.put(lease.getLeaseName(), lease);
180 }
181
182
183
184
185
186
187
188 public void renewLease(final String leaseName) throws LeaseException {
189 Lease lease = leases.get(leaseName);
190
191
192
193 if (lease == null ) {
194 throw new LeaseException("lease '" + leaseName +
195 "' does not exist or has already expired");
196 }
197 lease.resetExpirationTime();
198 }
199
200
201
202
203
204
205 public void cancelLease(final String leaseName) throws LeaseException {
206 removeLease(leaseName);
207 }
208
209
210
211
212
213
214
215
216
217
218 Lease removeLease(final String leaseName) throws LeaseException {
219 Lease lease = leases.remove(leaseName);
220 if (lease == null) {
221 throw new LeaseException("lease '" + leaseName + "' does not exist");
222 }
223 return lease;
224 }
225
226
227
228
229
230 @SuppressWarnings("serial")
231 public static class LeaseStillHeldException extends IOException {
232 private final String leaseName;
233
234
235
236
237 public LeaseStillHeldException(final String name) {
238 this.leaseName = name;
239 }
240
241
242 public String getName() {
243 return this.leaseName;
244 }
245 }
246
247
248 static class Lease implements Delayed {
249 private final String leaseName;
250 private final LeaseListener listener;
251 private int leaseTimeoutPeriod;
252 private long expirationTime;
253
254 Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
255 this.leaseName = leaseName;
256 this.listener = listener;
257 this.leaseTimeoutPeriod = leaseTimeoutPeriod;
258 this.expirationTime = 0;
259 }
260
261
262 public String getLeaseName() {
263 return leaseName;
264 }
265
266
267 public LeaseListener getListener() {
268 return this.listener;
269 }
270
271 @Override
272 public boolean equals(Object obj) {
273 if (this == obj) {
274 return true;
275 }
276 if (obj == null) {
277 return false;
278 }
279 if (getClass() != obj.getClass()) {
280 return false;
281 }
282 return this.hashCode() == obj.hashCode();
283 }
284
285 @Override
286 public int hashCode() {
287 return this.leaseName.hashCode();
288 }
289
290 public long getDelay(TimeUnit unit) {
291 return unit.convert(this.expirationTime - System.currentTimeMillis(),
292 TimeUnit.MILLISECONDS);
293 }
294
295 public int compareTo(Delayed o) {
296 long delta = this.getDelay(TimeUnit.MILLISECONDS) -
297 o.getDelay(TimeUnit.MILLISECONDS);
298
299 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
300 }
301
302
303
304
305 public void resetExpirationTime() {
306 this.expirationTime = System.currentTimeMillis() + this.leaseTimeoutPeriod;
307 }
308 }
309 }