View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.lang.reflect.UndeclaredThrowableException;
25  import java.net.SocketTimeoutException;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.DoNotRetryIOException;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.ipc.RpcClient;
35  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36  import org.apache.hadoop.hbase.util.ExceptionUtil;
37  import org.apache.hadoop.ipc.RemoteException;
38  
39  import com.google.protobuf.ServiceException;
40  
41  /**
42   * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
43   * threadlocal outstanding timeouts as so we don't persist too much.
44   * Dynamic rather than static so can set the generic appropriately.
45   */
46  @InterfaceAudience.Private
47  @edu.umd.cs.findbugs.annotations.SuppressWarnings
48      (value = "IS2_INCONSISTENT_SYNC", justification = "na")
49  public class RpcRetryingCaller<T> {
50    static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
51    /**
52     * Timeout for the call including retries
53     */
54    private int callTimeout;
55    /**
56     * When we started making calls.
57     */
58    private long globalStartTime;
59    /**
60     * Start and end times for a single call.
61     */
62    private final static int MIN_RPC_TIMEOUT = 2000;
63  
64    private final long pause;
65    private final int retries;
66  
67    public RpcRetryingCaller(long pause, int retries) {
68      this.pause = pause;
69      this.retries = retries;
70    }
71  
72    private void beforeCall() {
73      int remaining = (int)(callTimeout -
74        (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
75      if (remaining < MIN_RPC_TIMEOUT) {
76        // If there is no time left, we're trying anyway. It's too late.
77        // 0 means no timeout, and it's not the intent here. So we secure both cases by
78        // resetting to the minimum.
79        remaining = MIN_RPC_TIMEOUT;
80      }
81      RpcClient.setRpcTimeout(remaining);
82    }
83  
84    private void afterCall() {
85      RpcClient.resetRpcTimeout();
86    }
87  
88    public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
89        RuntimeException {
90      return callWithRetries(callable, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
91    }
92  
93    /**
94     * Retries if invocation fails.
95     * @param callTimeout Timeout for this call
96     * @param callable The {@link RetryingCallable} to run.
97     * @return an object of type T
98     * @throws IOException if a remote or network exception occurs
99     * @throws RuntimeException other unspecified error
100    */
101   @edu.umd.cs.findbugs.annotations.SuppressWarnings
102       (value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "na")
103   public synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout)
104   throws IOException, RuntimeException {
105     this.callTimeout = callTimeout;
106     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
107       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
108     this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
109     for (int tries = 0;; tries++) {
110       long expectedSleep = 0;
111       try {
112         beforeCall();
113         callable.prepare(tries != 0); // if called with false, check table status on ZK
114         return callable.call();
115       } catch (Throwable t) {
116         if (LOG.isTraceEnabled()) {
117           LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
118               (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t);
119         }
120         // translateException throws exception when should not retry: i.e. when request is bad.
121         t = translateException(t);
122         callable.throwable(t, retries != 1);
123         RetriesExhaustedException.ThrowableWithExtraContext qt =
124             new RetriesExhaustedException.ThrowableWithExtraContext(t,
125                 EnvironmentEdgeManager.currentTimeMillis(), toString());
126         exceptions.add(qt);
127         ExceptionUtil.rethrowIfInterrupt(t);
128         if (tries >= retries - 1) {
129           throw new RetriesExhaustedException(tries, exceptions);
130         }
131         // If the server is dead, we need to wait a little before retrying, to give
132         //  a chance to the regions to be
133         // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
134         expectedSleep = callable.sleep(pause, tries + 1);
135 
136         // If, after the planned sleep, there won't be enough time left, we stop now.
137         long duration = singleCallDuration(expectedSleep);
138         if (duration > this.callTimeout) {
139           String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration +
140               ": " + callable.getExceptionMessageAdditionalDetail();
141           throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
142         }
143       } finally {
144         afterCall();
145       }
146       try {
147         Thread.sleep(expectedSleep);
148       } catch (InterruptedException e) {
149         throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
150       }
151     }
152   }
153 
154   /**
155    * @param expectedSleep
156    * @return Calculate how long a single call took
157    */
158   private long singleCallDuration(final long expectedSleep) {
159     return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)
160       + MIN_RPC_TIMEOUT + expectedSleep;
161   }
162 
163   /**
164    * Call the server once only.
165    * {@link RetryingCallable} has a strange shape so we can do retrys.  Use this invocation if you
166    * want to do a single call only (A call to {@link RetryingCallable#call()} will not likely
167    * succeed).
168    * @return an object of type T
169    * @throws IOException if a remote or network exception occurs
170    * @throws RuntimeException other unspecified error
171    */
172   public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
173   throws IOException, RuntimeException {
174     // The code of this method should be shared with withRetries.
175     this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
176     this.callTimeout = callTimeout;
177     try {
178       beforeCall();
179       callable.prepare(false);
180       return callable.call();
181     } catch (Throwable t) {
182       Throwable t2 = translateException(t);
183       ExceptionUtil.rethrowIfInterrupt(t2);
184       // It would be nice to clear the location cache here.
185       if (t2 instanceof IOException) {
186         throw (IOException)t2;
187       } else {
188         throw new RuntimeException(t2);
189       }
190     } finally {
191       afterCall();
192     }
193   }
194 
195 
196   /**
197    * Get the good or the remote exception if any, throws the DoNotRetryIOException.
198    * @param t the throwable to analyze
199    * @return the translated exception, if it's not a DoNotRetryIOException
200    * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
201    */
202   static Throwable translateException(Throwable t) throws DoNotRetryIOException {
203     if (t instanceof UndeclaredThrowableException) {
204       if (t.getCause() != null) {
205         t = t.getCause();
206       }
207     }
208     if (t instanceof RemoteException) {
209       t = ((RemoteException)t).unwrapRemoteException();
210     }
211     if (t instanceof LinkageError) {
212       throw new DoNotRetryIOException(t);
213     }
214     if (t instanceof ServiceException) {
215       ServiceException se = (ServiceException)t;
216       Throwable cause = se.getCause();
217       if (cause != null && cause instanceof DoNotRetryIOException) {
218         throw (DoNotRetryIOException)cause;
219       }
220       // Don't let ServiceException out; its rpc specific.
221       t = cause;
222       // t could be a RemoteException so go aaround again.
223       translateException(t);
224     } else if (t instanceof DoNotRetryIOException) {
225       throw (DoNotRetryIOException)t;
226     }
227     return t;
228   }
229 }