View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.management.ManagementFactory;
24  import java.util.ConcurrentModificationException;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.SortedMap;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.DelayQueue;
32  import java.util.concurrent.Delayed;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.locks.ReentrantReadWriteLock;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.DroppedSnapshotException;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.RemoteExceptionHandler;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.hbase.util.Threads;
49  import org.apache.hadoop.util.StringUtils;
50  import org.cliffc.high_scale_lib.Counter;
51  
52  import com.google.common.base.Preconditions;
53  import org.cloudera.htrace.Trace;
54  import org.cloudera.htrace.TraceScope;
55  
56  /**
57   * Thread that flushes cache on request
58   *
59   * NOTE: This class extends Thread rather than Chore because the sleep time
60   * can be interrupted when there is something to do, rather than the Chore
61   * sleep time which is invariant.
62   *
63   * @see FlushRequester
64   */
65  @InterfaceAudience.Private
66  class MemStoreFlusher implements FlushRequester {
67    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
68    // These two data members go together.  Any entry in the one must have
69    // a corresponding entry in the other.
70    private final BlockingQueue<FlushQueueEntry> flushQueue =
71      new DelayQueue<FlushQueueEntry>();
72    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
73      new HashMap<HRegion, FlushRegionEntry>();
74    private AtomicBoolean wakeupPending = new AtomicBoolean();
75  
76    private final long threadWakeFrequency;
77    private final HRegionServer server;
78    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
79    private final Object blockSignal = new Object();
80  
81    protected final long globalMemStoreLimit;
82    protected final long globalMemStoreLimitLowMark;
83  
84    static final float DEFAULT_UPPER = 0.4f;
85    private static final float DEFAULT_LOWER = 0.35f;
86    static final String UPPER_KEY =
87      "hbase.regionserver.global.memstore.upperLimit";
88    private static final String LOWER_KEY =
89      "hbase.regionserver.global.memstore.lowerLimit";
90    private long blockingWaitTime;
91    private final Counter updatesBlockedMsHighWater = new Counter();
92  
93    private final FlushHandler[] flushHandlers;
94  
95    /**
96     * @param conf
97     * @param server
98     */
99    public MemStoreFlusher(final Configuration conf,
100       final HRegionServer server) {
101     super();
102     this.server = server;
103     this.threadWakeFrequency =
104       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
105     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
106     this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
107       UPPER_KEY, conf);
108     long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
109     if (lower > this.globalMemStoreLimit) {
110       lower = this.globalMemStoreLimit;
111       LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
112         "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
113     }
114     this.globalMemStoreLimitLowMark = lower;
115     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
116       90000);
117     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
118     this.flushHandlers = new FlushHandler[handlerCount];
119     LOG.info("globalMemStoreLimit=" +
120       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
121       ", globalMemStoreLimitLowMark=" +
122       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
123       ", maxHeap=" + StringUtils.humanReadableInt(max));
124   }
125 
126   /**
127    * Calculate size using passed <code>key</code> for configured
128    * percentage of <code>max</code>.
129    * @param max
130    * @param defaultLimit
131    * @param key
132    * @param c
133    * @return Limit.
134    */
135   static long globalMemStoreLimit(final long max,
136      final float defaultLimit, final String key, final Configuration c) {
137     float limit = c.getFloat(key, defaultLimit);
138     return getMemStoreLimit(max, limit, defaultLimit);
139   }
140 
141   static long getMemStoreLimit(final long max, final float limit,
142       final float defaultLimit) {
143     float effectiveLimit = limit;
144     if (limit >= 0.9f || limit < 0.1f) {
145       LOG.warn("Setting global memstore limit to default of " + defaultLimit +
146         " because supplied value outside allowed range of 0.1 -> 0.9");
147       effectiveLimit = defaultLimit;
148     }
149     return (long)(max * effectiveLimit);
150   }
151 
152   public Counter getUpdatesBlockedMsHighWater() {
153     return this.updatesBlockedMsHighWater;
154   }
155 
156   /**
157    * The memstore across all regions has exceeded the low water mark. Pick
158    * one region to flush and flush it synchronously (this is called from the
159    * flush thread)
160    * @return true if successful
161    */
162   private boolean flushOneForGlobalPressure() {
163     SortedMap<Long, HRegion> regionsBySize =
164         server.getCopyOfOnlineRegionsSortedBySize();
165 
166     Set<HRegion> excludedRegions = new HashSet<HRegion>();
167 
168     boolean flushedOne = false;
169     while (!flushedOne) {
170       // Find the biggest region that doesn't have too many storefiles
171       // (might be null!)
172       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
173           regionsBySize, excludedRegions, true);
174       // Find the biggest region, total, even if it might have too many flushes.
175       HRegion bestAnyRegion = getBiggestMemstoreRegion(
176           regionsBySize, excludedRegions, false);
177 
178       if (bestAnyRegion == null) {
179         LOG.error("Above memory mark but there are no flushable regions!");
180         return false;
181       }
182 
183       HRegion regionToFlush;
184       if (bestFlushableRegion != null &&
185           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
186         // Even if it's not supposed to be flushed, pick a region if it's more than twice
187         // as big as the best flushable one - otherwise when we're under pressure we make
188         // lots of little flushes and cause lots of compactions, etc, which just makes
189         // life worse!
190         if (LOG.isDebugEnabled()) {
191           LOG.debug("Under global heap pressure: " +
192             "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
193             "store files, but is " +
194             StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
195             " vs best flushable region's " +
196             StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
197             ". Choosing the bigger.");
198         }
199         regionToFlush = bestAnyRegion;
200       } else {
201         if (bestFlushableRegion == null) {
202           regionToFlush = bestAnyRegion;
203         } else {
204           regionToFlush = bestFlushableRegion;
205         }
206       }
207 
208       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
209 
210       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
211       flushedOne = flushRegion(regionToFlush, true);
212       if (!flushedOne) {
213         LOG.info("Excluding unflushable region " + regionToFlush +
214           " - trying to find a different region to flush.");
215         excludedRegions.add(regionToFlush);
216       }
217     }
218     return true;
219   }
220 
221   private class FlushHandler extends HasThread {
222 
223     private FlushHandler(String name) {
224       super(name);
225     }
226 
227     @Override
228     public void run() {
229       while (!server.isStopped()) {
230         FlushQueueEntry fqe = null;
231         try {
232           wakeupPending.set(false); // allow someone to wake us up again
233           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
234           if (fqe == null || fqe instanceof WakeupFlushThread) {
235             if (isAboveLowWaterMark()) {
236               LOG.debug("Flush thread woke up because memory above low water="
237                   + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
238               if (!flushOneForGlobalPressure()) {
239                 // Wasn't able to flush any region, but we're above low water mark
240                 // This is unlikely to happen, but might happen when closing the
241                 // entire server - another thread is flushing regions. We'll just
242                 // sleep a little bit to avoid spinning, and then pretend that
243                 // we flushed one, so anyone blocked will check again
244                 Thread.sleep(1000);
245                 wakeUpIfBlocking();
246               }
247               // Enqueue another one of these tokens so we'll wake up again
248               wakeupFlushThread();
249             }
250             continue;
251           }
252           FlushRegionEntry fre = (FlushRegionEntry) fqe;
253           if (!flushRegion(fre)) {
254             break;
255           }
256         } catch (InterruptedException ex) {
257           continue;
258         } catch (ConcurrentModificationException ex) {
259           continue;
260         } catch (Exception ex) {
261           LOG.error("Cache flusher failed for entry " + fqe, ex);
262           if (!server.checkFileSystem()) {
263             break;
264           }
265         }
266       }
267       synchronized (regionsInQueue) {
268         regionsInQueue.clear();
269         flushQueue.clear();
270       }
271 
272       // Signal anyone waiting, so they see the close flag
273       wakeUpIfBlocking();
274       LOG.info(getName() + " exiting");
275     }
276   }
277 
278 
279   private void wakeupFlushThread() {
280     if (wakeupPending.compareAndSet(false, true)) {
281       flushQueue.add(new WakeupFlushThread());
282     }
283   }
284 
285   private HRegion getBiggestMemstoreRegion(
286       SortedMap<Long, HRegion> regionsBySize,
287       Set<HRegion> excludedRegions,
288       boolean checkStoreFileCount) {
289     synchronized (regionsInQueue) {
290       for (HRegion region : regionsBySize.values()) {
291         if (excludedRegions.contains(region)) {
292           continue;
293         }
294 
295         if (region.writestate.flushing || !region.writestate.writesEnabled) {
296           continue;
297         }
298 
299         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
300           continue;
301         }
302         return region;
303       }
304     }
305     return null;
306   }
307 
308   /**
309    * Return true if global memory usage is above the high watermark
310    */
311   private boolean isAboveHighWaterMark() {
312     return server.getRegionServerAccounting().
313       getGlobalMemstoreSize() >= globalMemStoreLimit;
314   }
315 
316   /**
317    * Return true if we're above the high watermark
318    */
319   private boolean isAboveLowWaterMark() {
320     return server.getRegionServerAccounting().
321       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
322   }
323 
324   public void requestFlush(HRegion r) {
325     synchronized (regionsInQueue) {
326       if (!regionsInQueue.containsKey(r)) {
327         // This entry has no delay so it will be added at the top of the flush
328         // queue.  It'll come out near immediately.
329         FlushRegionEntry fqe = new FlushRegionEntry(r);
330         this.regionsInQueue.put(r, fqe);
331         this.flushQueue.add(fqe);
332       }
333     }
334   }
335 
336   public void requestDelayedFlush(HRegion r, long delay) {
337     synchronized (regionsInQueue) {
338       if (!regionsInQueue.containsKey(r)) {
339         // This entry has some delay
340         FlushRegionEntry fqe = new FlushRegionEntry(r);
341         fqe.requeue(delay);
342         this.regionsInQueue.put(r, fqe);
343         this.flushQueue.add(fqe);
344       }
345     }
346   }
347 
348   public int getFlushQueueSize() {
349     return flushQueue.size();
350   }
351 
352   /**
353    * Only interrupt once it's done with a run through the work loop.
354    */
355   void interruptIfNecessary() {
356     lock.writeLock().lock();
357     try {
358       for (FlushHandler flushHander : flushHandlers) {
359         if (flushHander != null) flushHander.interrupt();
360       }
361     } finally {
362       lock.writeLock().unlock();
363     }
364   }
365 
366   synchronized void start(UncaughtExceptionHandler eh) {
367     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
368         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
369     for (int i = 0; i < flushHandlers.length; i++) {
370       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
371       flusherThreadFactory.newThread(flushHandlers[i]);
372       flushHandlers[i].start();
373     }
374   }
375 
376   boolean isAlive() {
377     for (FlushHandler flushHander : flushHandlers) {
378       if (flushHander != null && flushHander.isAlive()) {
379         return true;
380       }
381     }
382     return false;
383   }
384 
385   void join() {
386     for (FlushHandler flushHander : flushHandlers) {
387       if (flushHander != null) {
388         Threads.shutdown(flushHander.getThread());
389       }
390     }
391   }
392 
393   /*
394    * A flushRegion that checks store file count.  If too many, puts the flush
395    * on delay queue to retry later.
396    * @param fqe
397    * @return true if the region was successfully flushed, false otherwise. If
398    * false, there will be accompanying log messages explaining why the log was
399    * not flushed.
400    */
401   private boolean flushRegion(final FlushRegionEntry fqe) {
402     HRegion region = fqe.region;
403     if (!region.getRegionInfo().isMetaRegion() &&
404         isTooManyStoreFiles(region)) {
405       if (fqe.isMaximumWait(this.blockingWaitTime)) {
406         LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
407           "ms on a compaction to clean up 'too many store files'; waited " +
408           "long enough... proceeding with flush of " +
409           region.getRegionNameAsString());
410       } else {
411         // If this is first time we've been put off, then emit a log message.
412         if (fqe.getRequeueCount() <= 0) {
413           // Note: We don't impose blockingStoreFiles constraint on meta regions
414           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
415             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
416           if (!this.server.compactSplitThread.requestSplit(region)) {
417             try {
418               this.server.compactSplitThread.requestSystemCompaction(
419                   region, Thread.currentThread().getName());
420             } catch (IOException e) {
421               LOG.error(
422                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
423                 RemoteExceptionHandler.checkIOException(e));
424             }
425           }
426         }
427 
428         // Put back on the queue.  Have it come back out of the queue
429         // after a delay of this.blockingWaitTime / 100 ms.
430         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
431         // Tell a lie, it's not flushed but it's ok
432         return true;
433       }
434     }
435     return flushRegion(region, false);
436   }
437 
438   /*
439    * Flush a region.
440    * @param region Region to flush.
441    * @param emergencyFlush Set if we are being force flushed. If true the region
442    * needs to be removed from the flush queue. If false, when we were called
443    * from the main flusher run loop and we got the entry to flush by calling
444    * poll on the flush queue (which removed it).
445    *
446    * @return true if the region was successfully flushed, false otherwise. If
447    * false, there will be accompanying log messages explaining why the log was
448    * not flushed.
449    */
450   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
451     synchronized (this.regionsInQueue) {
452       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
453       if (fqe != null && emergencyFlush) {
454         // Need to remove from region from delay queue.  When NOT an
455         // emergencyFlush, then item was removed via a flushQueue.poll.
456         flushQueue.remove(fqe);
457      }
458     }
459     lock.readLock().lock();
460     try {
461       boolean shouldCompact = region.flushcache().isCompactionNeeded();
462       // We just want to check the size
463       boolean shouldSplit = region.checkSplit() != null;
464       if (shouldSplit) {
465         this.server.compactSplitThread.requestSplit(region);
466       } else if (shouldCompact) {
467         server.compactSplitThread.requestSystemCompaction(
468             region, Thread.currentThread().getName());
469       }
470 
471     } catch (DroppedSnapshotException ex) {
472       // Cache flush can fail in a few places. If it fails in a critical
473       // section, we get a DroppedSnapshotException and a replay of hlog
474       // is required. Currently the only way to do this is a restart of
475       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
476       // where hdfs was bad but passed the hdfs check).
477       server.abort("Replay of HLog required. Forcing server shutdown", ex);
478       return false;
479     } catch (IOException ex) {
480       LOG.error("Cache flush failed" +
481         (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
482         RemoteExceptionHandler.checkIOException(ex));
483       if (!server.checkFileSystem()) {
484         return false;
485       }
486     } finally {
487       lock.readLock().unlock();
488       wakeUpIfBlocking();
489     }
490     return true;
491   }
492 
493   private void wakeUpIfBlocking() {
494     synchronized (blockSignal) {
495       blockSignal.notifyAll();
496     }
497   }
498 
499   private boolean isTooManyStoreFiles(HRegion region) {
500     for (Store store : region.stores.values()) {
501       if (store.hasTooManyStoreFiles()) {
502         return true;
503       }
504     }
505     return false;
506   }
507 
508   /**
509    * Check if the regionserver's memstore memory usage is greater than the
510    * limit. If so, flush regions with the biggest memstores until we're down
511    * to the lower limit. This method blocks callers until we're down to a safe
512    * amount of memstore consumption.
513    */
514   public void reclaimMemStoreMemory() {
515     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
516     if (isAboveHighWaterMark()) {
517       if (Trace.isTracing()) {
518         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
519       }
520       long start = System.currentTimeMillis();
521       synchronized (this.blockSignal) {
522         boolean blocked = false;
523         long startTime = 0;
524         while (isAboveHighWaterMark() && !server.isStopped()) {
525           if (!blocked) {
526             startTime = EnvironmentEdgeManager.currentTimeMillis();
527             LOG.info("Blocking updates on " + server.toString() +
528             ": the global memstore size " +
529             StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
530             " is >= than blocking " +
531             StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
532           }
533           blocked = true;
534           wakeupFlushThread();
535           try {
536             // we should be able to wait forever, but we've seen a bug where
537             // we miss a notify, so put a 5 second bound on it at least.
538             blockSignal.wait(5 * 1000);
539           } catch (InterruptedException ie) {
540             Thread.currentThread().interrupt();
541           }
542           long took = System.currentTimeMillis() - start;
543           LOG.warn("Memstore is above high water mark and block " + took + "ms");
544         }
545         if(blocked){
546           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
547           if(totalTime > 0){
548             this.updatesBlockedMsHighWater.add(totalTime);
549           }
550           LOG.info("Unblocking updates for server " + server.toString());
551         }
552       }
553     } else if (isAboveLowWaterMark()) {
554       wakeupFlushThread();
555     }
556     scope.close();
557   }
558   @Override
559   public String toString() {
560     return "flush_queue="
561         + flushQueue.size();
562   }
563 
564   public String dumpQueue() {
565     StringBuilder queueList = new StringBuilder();
566     queueList.append("Flush Queue Queue dump:\n");
567     queueList.append("  Flush Queue:\n");
568     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
569 
570     while(it.hasNext()){
571       queueList.append("    "+it.next().toString());
572       queueList.append("\n");
573     }
574 
575     return queueList.toString();
576   }
577 
578   interface FlushQueueEntry extends Delayed {}
579 
580   /**
581    * Token to insert into the flush queue that ensures that the flusher does not sleep
582    */
583   static class WakeupFlushThread implements FlushQueueEntry {
584     @Override
585     public long getDelay(TimeUnit unit) {
586       return 0;
587     }
588 
589     @Override
590     public int compareTo(Delayed o) {
591       return -1;
592     }
593 
594     @Override
595     public boolean equals(Object obj) {
596       return (this == obj);
597     }
598 
599   }
600 
601   /**
602    * Datastructure used in the flush queue.  Holds region and retry count.
603    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
604    * construction, the delay is zero. When added to a delay queue, we'll come
605    * out near immediately.  Call {@link #requeue(long)} passing delay in
606    * milliseconds before readding to delay queue if you want it to stay there
607    * a while.
608    */
609   static class FlushRegionEntry implements FlushQueueEntry {
610     private final HRegion region;
611 
612     private final long createTime;
613     private long whenToExpire;
614     private int requeueCount = 0;
615 
616     FlushRegionEntry(final HRegion r) {
617       this.region = r;
618       this.createTime = System.currentTimeMillis();
619       this.whenToExpire = this.createTime;
620     }
621 
622     /**
623      * @param maximumWait
624      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
625      */
626     public boolean isMaximumWait(final long maximumWait) {
627       return (System.currentTimeMillis() - this.createTime) > maximumWait;
628     }
629 
630     /**
631      * @return Count of times {@link #requeue(long)} was called; i.e this is
632      * number of times we've been requeued.
633      */
634     public int getRequeueCount() {
635       return this.requeueCount;
636     }
637 
638     /**
639      * @param when When to expire, when to come up out of the queue.
640      * Specify in milliseconds.  This method adds System.currentTimeMillis()
641      * to whatever you pass.
642      * @return This.
643      */
644     public FlushRegionEntry requeue(final long when) {
645       this.whenToExpire = System.currentTimeMillis() + when;
646       this.requeueCount++;
647       return this;
648     }
649 
650     @Override
651     public long getDelay(TimeUnit unit) {
652       return unit.convert(this.whenToExpire - System.currentTimeMillis(),
653           TimeUnit.MILLISECONDS);
654     }
655 
656     @Override
657     public int compareTo(Delayed other) {
658       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
659         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
660     }
661 
662     @Override
663     public String toString() {
664       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
665     }
666 
667     @Override
668     public boolean equals(Object obj) {
669       if (this == obj) {
670         return true;
671       }
672       if (obj == null || getClass() != obj.getClass()) {
673         return false;
674       }
675       Delayed other = (Delayed) obj;
676       return compareTo(other) == 0;
677     }
678   }
679 }