View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase;
21  
22  import java.text.MessageFormat;
23  
24  import junit.framework.Assert;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  
31  /**
32   * A class that provides a standard waitFor pattern
33   * See details at https://issues.apache.org/jira/browse/HBASE-7384
34   */
35  @InterfaceAudience.Private
36  public final class Waiter {
37  
38    private static final Log LOG = LogFactory.getLog(Waiter.class);
39  
40    /**
41     * System property name whose value is a scale factor to increase time out values dynamically used
42     * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
43     * {@link #waitFor(Configuration, long, long, Predicate)}, and
44     * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
45     * <p/>
46     * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
47     */
48    public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
49  
50    private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
51  
52    private static float waitForRatio = -1;
53  
54    private Waiter() {
55    }
56  
57    /**
58     * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
59     * {@link #waitFor(Configuration, long, Predicate)},
60     * {@link #waitFor(Configuration, long, long, Predicate)} and
61     * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
62     * <p/>
63     * This is useful to dynamically adjust max time out values when same test cases run in different
64     * test machine settings without recompiling & re-deploying code.
65     * <p/>
66     * The value is obtained from the Java System property or configuration setting
67     * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
68     * @param conf the configuration
69     * @return the 'wait for ratio' for the current test run.
70     */
71    public static float getWaitForRatio(Configuration conf) {
72      if (waitForRatio < 0) {
73        // System property takes precedence over configuration setting
74        if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
75          waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
76        } else {
77          waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
78        }
79      }
80      return waitForRatio;
81    }
82  
83    /**
84     * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
85     * {@link Waiter#waitFor(Configuration, long, Predicate)} and
86     * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate) methods.
87     */
88    @InterfaceAudience.Private
89    public interface Predicate<E extends Exception> {
90  
91      /**
92       * Perform a predicate evaluation.
93       * @return the boolean result of the evaluation.
94       * @throws Exception thrown if the predicate evaluation could not evaluate.
95       */
96      boolean evaluate() throws E;
97  
98    }
99  
100   /**
101    * A mixin interface, can be used with {@link Waiter} to explain failed state.
102    */
103   @InterfaceAudience.Private
104   public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
105 
106     /**
107      * Perform a predicate evaluation.
108      *
109      * @return explanation of failed state
110      */
111     String explainFailure() throws E;
112 
113   }
114 
115   /**
116    * Makes the current thread sleep for the duration equal to the specified time in milliseconds
117    * multiplied by the {@link #getWaitForRatio(Configuration)}.
118    * @param conf the configuration
119    * @param time the number of milliseconds to sleep.
120    */
121   public static void sleep(Configuration conf, long time) {
122     try {
123       Thread.sleep((long) (getWaitForRatio(conf) * time));
124     } catch (InterruptedException ex) {
125       LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
126     }
127   }
128 
129   /**
130    * Waits up to the duration equal to the specified timeout multiplied by the
131    * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
132    * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
133    * <code>false</code>.
134    * <p/>
135    * @param conf the configuration
136    * @param timeout the timeout in milliseconds to wait for the predicate.
137    * @param predicate the predicate to evaluate.
138    * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
139    *         wait is interrupted otherwise <code>-1</code> when times out
140    */
141   public static <E extends Exception> long waitFor(Configuration conf, long timeout,
142       Predicate<E> predicate) throws E {
143     return waitFor(conf, timeout, 100, true, predicate);
144   }
145 
146   /**
147    * Waits up to the duration equal to the specified timeout multiplied by the
148    * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
149    * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
150    * <code>false</code>.
151    * <p/>
152    * @param conf the configuration
153    * @param timeout the max timeout in milliseconds to wait for the predicate.
154    * @param interval the interval in milliseconds to evaluate predicate.
155    * @param predicate the predicate to evaluate.
156    * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
157    *         wait is interrupted otherwise <code>-1</code> when times out
158    */
159   public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
160       Predicate<E> predicate) throws E {
161     return waitFor(conf, timeout, interval, true, predicate);
162   }
163 
164   /**
165    * Waits up to the duration equal to the specified timeout multiplied by the
166    * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
167    * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
168    * <code>false</code> and failIfTimeout is set as <code>true</code>.
169    * <p/>
170    * @param conf the configuration
171    * @param timeout the timeout in milliseconds to wait for the predicate.
172    * @param interval the interval in milliseconds to evaluate predicate.
173    * @param failIfTimeout indicates if should fail current test case when times out.
174    * @param predicate the predicate to evaluate.
175    * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
176    *         wait is interrupted otherwise <code>-1</code> when times out
177    */
178   public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
179       boolean failIfTimeout, Predicate<E> predicate) throws E {
180     long started = System.currentTimeMillis();
181     long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
182     long mustEnd = started + adjustedTimeout;
183     long remainderWait = 0;
184     long sleepInterval = 0;
185     Boolean eval = false;
186     Boolean interrupted = false;
187 
188     try {
189       LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
190         adjustedTimeout, getWaitForRatio(conf)));
191       while (!(eval = predicate.evaluate())
192               && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) {
193         try {
194           // handle tail case when remainder wait is less than one interval
195           sleepInterval = (remainderWait > interval) ? interval : remainderWait;
196           Thread.sleep(sleepInterval);
197         } catch (InterruptedException e) {
198           eval = predicate.evaluate();
199           interrupted = true;
200           break;
201         }
202       }
203       if (!eval) {
204         if (interrupted) {
205           LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
206             System.currentTimeMillis() - started));
207         } else if (failIfTimeout) {
208           String msg = getExplanation(predicate);
209           Assert.fail(MessageFormat
210               .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
211         } else {
212           String msg = getExplanation(predicate);
213           LOG.warn(
214               MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
215         }
216       }
217       return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1;
218     } catch (Exception ex) {
219       throw new RuntimeException(ex);
220     }
221   }
222 
223   public static String getExplanation(Predicate explain) {
224     if (explain instanceof ExplainingPredicate) {
225       try {
226         return " " + ((ExplainingPredicate) explain).explainFailure();
227       } catch (Exception e) {
228         LOG.error("Failed to get explanation, ", e);
229         return e.getMessage();
230       }
231     } else {
232       return "";
233     }
234   }
235 
236 }