View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.util.HasThread;
25  
26  import java.util.ConcurrentModificationException;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Delayed;
32  import java.util.concurrent.DelayQueue;
33  import java.util.concurrent.TimeUnit;
34  
35  import java.io.IOException;
36  
37  /**
38   * Leases
39   *
40   * There are several server classes in HBase that need to track external
41   * clients that occasionally send heartbeats.
42   *
43   * <p>These external clients hold resources in the server class.
44   * Those resources need to be released if the external client fails to send a
45   * heartbeat after some interval of time passes.
46   *
47   * <p>The Leases class is a general reusable class for this kind of pattern.
48   * An instance of the Leases class will create a thread to do its dirty work.
49   * You should close() the instance if you want to clean up the thread properly.
50   *
51   * <p>
52   * NOTE: This class extends Thread rather than Chore because the sleep time
53   * can be interrupted when there is something to do, rather than the Chore
54   * sleep time which is invariant.
55   */
56  @InterfaceAudience.Private
57  public class Leases extends HasThread {
58    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
59    public static final int MIN_WAIT_TIME = 100;
60    private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
61  
62    protected final int leaseCheckFrequency;
63    protected volatile boolean stopRequested = false;
64  
65    /**
66     * Creates a lease monitor
67     * 
68     * @param leaseCheckFrequency - how often the lease should be checked
69     *          (milliseconds)
70     */
71    public Leases(final int leaseCheckFrequency) {
72      this.leaseCheckFrequency = leaseCheckFrequency;
73      setDaemon(true);
74    }
75  
76    /**
77     * @see Thread#run()
78     */
79    @Override
80    public void run() {
81      long toWait = leaseCheckFrequency;
82      Lease nextLease = null;
83      long nextLeaseDelay = Long.MAX_VALUE;
84  
85      while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
86  
87        try {
88          if (nextLease != null) {
89            toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
90          }
91  
92          toWait = Math.min(leaseCheckFrequency, toWait);
93          toWait = Math.max(MIN_WAIT_TIME, toWait);
94  
95          Thread.sleep(toWait);
96        } catch (InterruptedException e) {
97          continue;
98        } catch (ConcurrentModificationException e) {
99          continue;
100       } catch (Throwable e) {
101         LOG.fatal("Unexpected exception killed leases thread", e);
102         break;
103       }
104 
105       nextLease = null;
106       nextLeaseDelay = Long.MAX_VALUE;
107       for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
108         Map.Entry<String, Lease> entry = it.next();
109         Lease lease = entry.getValue();
110         long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
111         if ( thisLeaseDelay > 0) {
112           if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
113             nextLease = lease;
114             nextLeaseDelay = thisLeaseDelay;
115           }
116         } else {
117           // A lease expired.  Run the expired code before removing from map
118           // since its presence in map is used to see if lease exists still.
119           if (lease.getListener() == null) {
120             LOG.error("lease listener is null for lease " + lease.getLeaseName());
121           } else {
122             lease.getListener().leaseExpired();
123           }
124           it.remove();
125         }
126       }
127     }
128     close();
129   }
130 
131   /**
132    * Shuts down this lease instance when all outstanding leases expire.
133    * Like {@link #close()} but rather than violently end all leases, waits
134    * first on extant leases to finish.  Use this method if the lease holders
135    * could loose data, leak locks, etc.  Presumes client has shutdown
136    * allocation of new leases.
137    */
138   public void closeAfterLeasesExpire() {
139     this.stopRequested = true;
140   }
141 
142   /**
143    * Shut down this Leases instance.  All pending leases will be destroyed,
144    * without any cancellation calls.
145    */
146   public void close() {
147     LOG.info(Thread.currentThread().getName() + " closing leases");
148     this.stopRequested = true;
149     leases.clear();
150     LOG.info(Thread.currentThread().getName() + " closed leases");
151   }
152 
153   /**
154    * Obtain a lease.
155    *
156    * @param leaseName name of the lease
157    * @param leaseTimeoutPeriod length of the lease in milliseconds
158    * @param listener listener that will process lease expirations
159    * @throws LeaseStillHeldException
160    */
161   public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
162       throws LeaseStillHeldException {
163     addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
164   }
165 
166   /**
167    * Inserts lease.  Resets expiration before insertion.
168    * @param lease
169    * @throws LeaseStillHeldException
170    */
171   public void addLease(final Lease lease) throws LeaseStillHeldException {
172     if (this.stopRequested) {
173       return;
174     }
175     lease.resetExpirationTime();
176     if (leases.containsKey(lease.getLeaseName())) {
177       throw new LeaseStillHeldException(lease.getLeaseName());
178     }
179     leases.put(lease.getLeaseName(), lease);
180   }
181 
182   /**
183    * Renew a lease
184    *
185    * @param leaseName name of lease
186    * @throws LeaseException
187    */
188   public void renewLease(final String leaseName) throws LeaseException {
189     Lease lease = leases.get(leaseName);
190     // We need to check to see if the remove is successful as the poll in the run()
191     // method could have completed between the get and the remove which will result
192     // in a corrupt leaseQueue.
193     if (lease == null ) {
194       throw new LeaseException("lease '" + leaseName +
195           "' does not exist or has already expired");
196     }
197     lease.resetExpirationTime();
198   }
199 
200   /**
201    * Client explicitly cancels a lease.
202    * @param leaseName name of lease
203    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
204    */
205   public void cancelLease(final String leaseName) throws LeaseException {
206     removeLease(leaseName);
207   }
208 
209   /**
210    * Remove named lease.
211    * Lease is removed from the list of leases and removed from the delay queue.
212    * Lease can be resinserted using {@link #addLease(Lease)}
213    *
214    * @param leaseName name of lease
215    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
216    * @return Removed lease
217    */
218   Lease removeLease(final String leaseName) throws LeaseException {
219     Lease lease = leases.remove(leaseName);
220     if (lease == null) {
221       throw new LeaseException("lease '" + leaseName + "' does not exist");
222     }
223     return lease;
224   }
225 
226   /**
227    * Thrown if we are asked create a lease but lease on passed name already
228    * exists.
229    */
230   @SuppressWarnings("serial")
231   public static class LeaseStillHeldException extends IOException {
232     private final String leaseName;
233 
234     /**
235      * @param name
236      */
237     public LeaseStillHeldException(final String name) {
238       this.leaseName = name;
239     }
240 
241     /** @return name of lease */
242     public String getName() {
243       return this.leaseName;
244     }
245   }
246 
247   /** This class tracks a single Lease. */
248   static class Lease implements Delayed {
249     private final String leaseName;
250     private final LeaseListener listener;
251     private int leaseTimeoutPeriod;
252     private long expirationTime;
253 
254     Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
255       this.leaseName = leaseName;
256       this.listener = listener;
257       this.leaseTimeoutPeriod = leaseTimeoutPeriod;
258       this.expirationTime = 0;
259     }
260 
261     /** @return the lease name */
262     public String getLeaseName() {
263       return leaseName;
264     }
265 
266     /** @return listener */
267     public LeaseListener getListener() {
268       return this.listener;
269     }
270 
271     @Override
272     public boolean equals(Object obj) {
273       if (this == obj) {
274         return true;
275       }
276       if (obj == null) {
277         return false;
278       }
279       if (getClass() != obj.getClass()) {
280         return false;
281       }
282       return this.hashCode() == obj.hashCode();
283     }
284 
285     @Override
286     public int hashCode() {
287       return this.leaseName.hashCode();
288     }
289 
290     public long getDelay(TimeUnit unit) {
291       return unit.convert(this.expirationTime - System.currentTimeMillis(),
292           TimeUnit.MILLISECONDS);
293     }
294 
295     public int compareTo(Delayed o) {
296       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
297         o.getDelay(TimeUnit.MILLISECONDS);
298 
299       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
300     }
301 
302     /**
303      * Resets the expiration time of the lease.
304      */
305     public void resetExpirationTime() {
306       this.expirationTime = System.currentTimeMillis() + this.leaseTimeoutPeriod;
307     }
308   }
309 }