View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.RetryImmediatelyException;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.RegionLocations;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.htrace.Trace;
61  
62  import com.google.common.annotations.VisibleForTesting;
63  
64  /**
65   * This class  allows a continuous flow of requests. It's written to be compatible with a
66   * synchronous caller such as HTable.
67   * <p>
68   * The caller sends a buffer of operation, by calling submit. This class extract from this list
69   * the operations it can send, i.e. the operations that are on region that are not considered
70   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
71   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
72   * to submit will block. Alternatively, the caller can call submitAll, in which case all the
73   * operations will be sent. Each call to submit returns a future-like object that can be used
74   * to track operation progress.
75   * </p>
76   * <p>
77   * The class manages internally the retries.
78   * </p>
79   * <p>
80   * The class can be constructed in regular mode, or "global error" mode. In global error mode,
81   * AP tracks errors across all calls (each "future" also has global view of all errors). That
82   * mode is necessary for backward compat with HTable behavior, where multiple submissions are
83   * made and the errors can propagate using any put/flush call, from previous calls.
84   * In "regular" mode, the errors are tracked inside the Future object that is returned.
85   * The results are always tracked inside the Future object and can be retrieved when the call
86   * has finished. Partial results can also be retrieved if some part of multi-request failed.
87   * </p>
88   * <p>
89   * This class is thread safe in regular mode; in global error code, submitting operations and
90   * retrieving errors from different threads may be not thread safe.
91   * Internally, the class is thread safe enough to manage simultaneously new submission and results
92   * arising from older operations.
93   * </p>
94   * <p>
95   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
96   * gets as well.
97   * </p>
98   */
99  @InterfaceAudience.Private
100 class AsyncProcess {
101   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102   protected static final AtomicLong COUNTER = new AtomicLong();
103 
104   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105 
106   /**
107    * Configure the number of failures after which the client will start logging. A few failures
108    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
109    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
110    * this stage.
111    */
112   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113       "hbase.client.start.log.errors.counter";
114   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115 
116   /**
117    * The context used to wait for results from one submit call.
118    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
119    *    then errors and failed operations in this object will reflect global errors.
120    * 2) If submit call is made with needResults false, results will not be saved.
121    *  */
122   public static interface AsyncRequestFuture {
123     public boolean hasError();
124     public RetriesExhaustedWithDetailsException getErrors();
125     public List<? extends Row> getFailedOperations();
126     public Object[] getResults() throws InterruptedIOException;
127     /** Wait until all tasks are executed, successfully or not. */
128     public void waitUntilDone() throws InterruptedIOException;
129   }
130 
131   /**
132    * Return value from a submit that didn't contain any requests.
133    */
134   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
135 
136     final Object[] result = new Object[0];
137 
138     @Override
139     public boolean hasError() {
140       return false;
141     }
142 
143     @Override
144     public RetriesExhaustedWithDetailsException getErrors() {
145       return null;
146     }
147 
148     @Override
149     public List<? extends Row> getFailedOperations() {
150       return null;
151     }
152 
153     @Override
154     public Object[] getResults() {
155       return result;
156     }
157 
158     @Override
159     public void waitUntilDone() throws InterruptedIOException {
160     }
161   };
162 
163   /** Sync point for calls to multiple replicas for the same user request (Get).
164    * Created and put in the results array (we assume replica calls require results) when
165    * the replica calls are launched. See results for details of this process.
166    * POJO, all fields are public. To modify them, the object itself is locked. */
167   private static class ReplicaResultState {
168     public ReplicaResultState(int callCount) {
169       this.callCount = callCount;
170     }
171 
172     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
173     int callCount;
174     /** Errors for which it is not decided whether we will report them to user. If one of the
175      * calls succeeds, we will discard the errors that may have happened in the other calls. */
176     BatchErrors replicaErrors = null;
177 
178     @Override
179     public String toString() {
180       return "[call count " + callCount + "; errors " + replicaErrors + "]";
181     }
182   }
183 
184 
185   // TODO: many of the fields should be made private
186   protected final long id;
187 
188   protected final ClusterConnection connection;
189   protected final RpcRetryingCallerFactory rpcCallerFactory;
190   protected final RpcControllerFactory rpcFactory;
191   protected final BatchErrors globalErrors;
192   protected final ExecutorService pool;
193 
194   protected final AtomicLong tasksInProgress = new AtomicLong(0);
195   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
196       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
197   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
198       new ConcurrentHashMap<ServerName, AtomicInteger>();
199 
200   // Start configuration settings.
201   private final int startLogErrorsCnt;
202 
203   /**
204    * The number of tasks simultaneously executed on the cluster.
205    */
206   protected final int maxTotalConcurrentTasks;
207 
208   /**
209    * The number of tasks we run in parallel on a single region.
210    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
211    * a set of operations on a region before the previous one is done. As well, this limits
212    * the pressure we put on the region server.
213    */
214   protected final int maxConcurrentTasksPerRegion;
215 
216   /**
217    * The number of task simultaneously executed on a single region server.
218    */
219   protected final int maxConcurrentTasksPerServer;
220   protected final long pause;
221   protected int numTries;
222   protected int serverTrackerTimeout;
223   protected int timeout;
224   protected long primaryCallTimeoutMicroseconds;
225   // End configuration settings.
226 
227   protected static class BatchErrors {
228     private final List<Throwable> throwables = new ArrayList<Throwable>();
229     private final List<Row> actions = new ArrayList<Row>();
230     private final List<String> addresses = new ArrayList<String>();
231 
232     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
233       if (row == null){
234         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
235       }
236 
237       throwables.add(ex);
238       actions.add(row);
239       addresses.add(serverName != null ? serverName.toString() : "null");
240     }
241 
242     public boolean hasErrors() {
243       return !throwables.isEmpty();
244     }
245 
246     private synchronized RetriesExhaustedWithDetailsException makeException() {
247       return new RetriesExhaustedWithDetailsException(
248           new ArrayList<Throwable>(throwables),
249           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
250     }
251 
252     public synchronized void clear() {
253       throwables.clear();
254       actions.clear();
255       addresses.clear();
256     }
257 
258     public synchronized void merge(BatchErrors other) {
259       throwables.addAll(other.throwables);
260       actions.addAll(other.actions);
261       addresses.addAll(other.addresses);
262     }
263   }
264 
265   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
266       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
267     if (hc == null) {
268       throw new IllegalArgumentException("HConnection cannot be null.");
269     }
270 
271     this.connection = hc;
272     this.pool = pool;
273     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
274 
275     this.id = COUNTER.incrementAndGet();
276 
277     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
278         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
279     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
280         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
281     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
282         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
283     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
284 
285     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
286       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
287     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
288           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
289     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
290           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
291 
292     this.startLogErrorsCnt =
293         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
294 
295     if (this.maxTotalConcurrentTasks <= 0) {
296       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
297     }
298     if (this.maxConcurrentTasksPerServer <= 0) {
299       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
300           maxConcurrentTasksPerServer);
301     }
302     if (this.maxConcurrentTasksPerRegion <= 0) {
303       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
304           maxConcurrentTasksPerRegion);
305     }
306 
307     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
308     // However, if we are too useful, we might fail very quickly due to retry count limit.
309     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
310     // retry time if normal retries were used. Then we will retry until this time runs out.
311     // If we keep hitting one server, the net effect will be the incremental backoff, and
312     // essentially the same number of retries as planned. If we have to do faster retries,
313     // we will do more retries in aggregate, but the user will be none the wiser.
314     this.serverTrackerTimeout = 0;
315     for (int i = 0; i < this.numTries; ++i) {
316       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
317     }
318 
319     this.rpcCallerFactory = rpcCaller;
320     this.rpcFactory = rpcFactory;
321   }
322 
323   /**
324    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
325    *         RuntimeException
326    */
327   private ExecutorService getPool(ExecutorService pool) {
328     if (pool != null) {
329       return pool;
330     }
331     if (this.pool != null) {
332       return this.pool;
333     }
334     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
335   }
336 
337   /**
338    * See {@link #submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)}.
339    * Uses default ExecutorService for this AP (must have been created with one).
340    */
341   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
342       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
343       throws InterruptedIOException {
344     return submit(null, tableName, rows, atLeastOne, callback, needResults);
345   }
346 
347   /**
348    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
349    * list. Does not send requests to replicas (not currently used for anything other
350    * than streaming puts anyway).
351    *
352    * @param pool ExecutorService to use.
353    * @param tableName The table for which this request is needed.
354    * @param callback Batch callback. Only called on success (94 behavior).
355    * @param needResults Whether results are needed, or can be discarded.
356    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
357    * @param atLeastOne true if we should submit at least a subset.
358    */
359   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
360       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
361       boolean needResults) throws InterruptedIOException {
362     if (rows.isEmpty()) {
363       return NO_REQS_RESULT;
364     }
365 
366     Map<ServerName, MultiAction<Row>> actionsByServer =
367         new HashMap<ServerName, MultiAction<Row>>();
368     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
369 
370     NonceGenerator ng = this.connection.getNonceGenerator();
371     long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
372 
373     // Location errors that happen before we decide what requests to take.
374     List<Exception> locationErrors = null;
375     List<Integer> locationErrorRows = null;
376     do {
377       // Wait until there is at least one slot for a new task.
378       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
379 
380       // Remember the previous decisions about regions or region servers we put in the
381       //  final multi.
382       Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
383       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
384 
385       int posInList = -1;
386       Iterator<? extends Row> it = rows.iterator();
387       while (it.hasNext()) {
388         Row r = it.next();
389         HRegionLocation loc;
390         try {
391           if (r == null) {
392             throw new IllegalArgumentException("#" + id + ", row cannot be null");
393           }
394           // Make sure we get 0-s replica.
395           RegionLocations locs = connection.locateRegion(
396               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
397           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
398             throw new IOException("#" + id + ", no location found, aborting submit for"
399                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
400           }
401           loc = locs.getDefaultRegionLocation();
402         } catch (IOException ex) {
403           locationErrors = new ArrayList<Exception>();
404           locationErrorRows = new ArrayList<Integer>();
405           LOG.error("Failed to get region location ", ex);
406           // This action failed before creating ars. Retain it, but do not add to submit list.
407           // We will then add it to ars in an already-failed state.
408           retainedActions.add(new Action<Row>(r, ++posInList));
409           locationErrors.add(ex);
410           locationErrorRows.add(posInList);
411           it.remove();
412           break; // Backward compat: we stop considering actions on location error.
413         }
414 
415         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
416           Action<Row> action = new Action<Row>(r, ++posInList);
417           setNonce(ng, r, action);
418           retainedActions.add(action);
419           // TODO: replica-get is not supported on this path
420           byte[] regionName = loc.getRegionInfo().getRegionName();
421           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
422           it.remove();
423         }
424       }
425     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
426 
427     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
428 
429     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
430       locationErrors, locationErrorRows, actionsByServer, pool);
431   }
432 
433   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
434       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
435       Object[] results, boolean needResults, List<Exception> locationErrors,
436       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
437       ExecutorService pool) {
438     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
439       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
440     // Add location errors if any
441     if (locationErrors != null) {
442       for (int i = 0; i < locationErrors.size(); ++i) {
443         int originalIndex = locationErrorRows.get(i);
444         Row row = retainedActions.get(originalIndex).getAction();
445         ars.manageError(originalIndex, row,
446           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
447       }
448     }
449     ars.sendMultiAction(actionsByServer, 1, null, false);
450     return ars;
451   }
452 
453   /**
454    * Helper that is used when grouping the actions per region server.
455    *
456    * @param loc - the destination. Must not be null.
457    * @param action - the action to add to the multiaction
458    * @param actionsByServer the multiaction per server
459    * @param nonceGroup Nonce group.
460    */
461   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
462       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
463     MultiAction<Row> multiAction = actionsByServer.get(server);
464     if (multiAction == null) {
465       multiAction = new MultiAction<Row>();
466       actionsByServer.put(server, multiAction);
467     }
468     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
469       multiAction.setNonceGroup(nonceGroup);
470     }
471 
472     multiAction.add(regionName, action);
473   }
474 
475   /**
476    * Check if we should send new operations to this region or region server.
477    * We're taking into account the past decision; if we have already accepted
478    * operation on a given region, we accept all operations for this region.
479    *
480    * @param loc; the region and the server name we want to use.
481    * @return true if this region is considered as busy.
482    */
483   protected boolean canTakeOperation(HRegionLocation loc,
484                                      Map<HRegionInfo, Boolean> regionsIncluded,
485                                      Map<ServerName, Boolean> serversIncluded) {
486     HRegionInfo regionInfo = loc.getRegionInfo();
487     Boolean regionPrevious = regionsIncluded.get(regionInfo);
488 
489     if (regionPrevious != null) {
490       // We already know what to do with this region.
491       return regionPrevious;
492     }
493 
494     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
495     if (Boolean.FALSE.equals(serverPrevious)) {
496       // It's a new region, on a region server that we have already excluded.
497       regionsIncluded.put(regionInfo, Boolean.FALSE);
498       return false;
499     }
500 
501     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
502     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
503       // Too many tasks on this region already.
504       regionsIncluded.put(regionInfo, Boolean.FALSE);
505       return false;
506     }
507 
508     if (serverPrevious == null) {
509       // The region is ok, but we need to decide for this region server.
510       int newServers = 0; // number of servers we're going to contact so far
511       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
512         if (kv.getValue()) {
513           newServers++;
514         }
515       }
516 
517       // Do we have too many total tasks already?
518       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
519 
520       if (ok) {
521         // If the total is fine, is it ok for this individual server?
522         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
523         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
524       }
525 
526       if (!ok) {
527         regionsIncluded.put(regionInfo, Boolean.FALSE);
528         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
529         return false;
530       }
531 
532       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
533     } else {
534       assert serverPrevious.equals(Boolean.TRUE);
535     }
536 
537     regionsIncluded.put(regionInfo, Boolean.TRUE);
538 
539     return true;
540   }
541 
542   /**
543    * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}.
544    * Uses default ExecutorService for this AP (must have been created with one).
545    */
546   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
547       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
548     return submitAll(null, tableName, rows, callback, results);
549   }
550 
551   /**
552    * Submit immediately the list of rows, whatever the server status. Kept for backward
553    * compatibility: it allows to be used with the batch interface that return an array of objects.
554    *
555    * @param pool ExecutorService to use.
556    * @param tableName name of the table for which the submission is made.
557    * @param rows the list of rows.
558    * @param callback the callback.
559    * @param results Optional array to return the results thru; backward compat.
560    */
561   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
562       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
564 
565     // The position will be used by the processBatch to match the object array returned.
566     int posInList = -1;
567     NonceGenerator ng = this.connection.getNonceGenerator();
568     for (Row r : rows) {
569       posInList++;
570       if (r instanceof Put) {
571         Put put = (Put) r;
572         if (put.isEmpty()) {
573           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
574         }
575       }
576       Action<Row> action = new Action<Row>(r, posInList);
577       setNonce(ng, r, action);
578       actions.add(action);
579     }
580     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
581         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
582     ars.groupAndSendMultiAction(actions, 1);
583     return ars;
584   }
585 
586   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
587     if (!(r instanceof Append) && !(r instanceof Increment)) return;
588     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
589   }
590 
591   /**
592    * The context, and return value, for a single submit/submitAll call.
593    * Note on how this class (one AP submit) works. Initially, all requests are split into groups
594    * by server; request is sent to each server in parallel; the RPC calls are not async so a
595    * thread per server is used. Every time some actions fail, regions/locations might have
596    * changed, so we re-group them by server and region again and send these groups in parallel
597    * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
598    * scheduling children. This is why lots of code doesn't require any synchronization.
599    */
600   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
601 
602     /**
603      * Runnable (that can be submitted to thread pool) that waits for when it's time
604      * to issue replica calls, finds region replicas, groups the requests by replica and
605      * issues the calls (on separate threads, via sendMultiAction).
606      * This is done on a separate thread because we don't want to wait on user thread for
607      * our asynchronous call, and usually we have to wait before making replica calls.
608      */
609     private final class ReplicaCallIssuingRunnable implements Runnable {
610       private final long startTime;
611       private final List<Action<Row>> initialActions;
612 
613       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
614         this.initialActions = initialActions;
615         this.startTime = startTime;
616       }
617 
618       @Override
619       public void run() {
620         boolean done = false;
621         if (primaryCallTimeoutMicroseconds > 0) {
622           try {
623             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
624           } catch (InterruptedException ex) {
625             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
626             return;
627           }
628         }
629         if (done) return; // Done within primary timeout
630         Map<ServerName, MultiAction<Row>> actionsByServer =
631             new HashMap<ServerName, MultiAction<Row>>();
632         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
633         if (replicaGetIndices == null) {
634           for (int i = 0; i < results.length; ++i) {
635             addReplicaActions(i, actionsByServer, unknownLocActions);
636           }
637         } else {
638           for (int replicaGetIndice : replicaGetIndices) {
639             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
640           }
641         }
642         if (!actionsByServer.isEmpty()) {
643           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
644         }
645         if (!unknownLocActions.isEmpty()) {
646           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
647           for (Action<Row> action : unknownLocActions) {
648             addReplicaActionsAgain(action, actionsByServer);
649           }
650           // Some actions may have completely failed, they are handled inside addAgain.
651           if (!actionsByServer.isEmpty()) {
652             sendMultiAction(actionsByServer, 1, null, true);
653           }
654         }
655       }
656 
657       /**
658        * Add replica actions to action map by server.
659        * @param index Index of the original action.
660        * @param actionsByServer The map by server to add it to.
661        */
662       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
663           List<Action<Row>> unknownReplicaActions) {
664         if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
665         Action<Row> action = initialActions.get(index);
666         RegionLocations loc = findAllLocationsOrFail(action, true);
667         if (loc == null) return;
668         HRegionLocation[] locs = loc.getRegionLocations();
669         if (locs.length == 1) {
670           LOG.warn("No replicas found for " + action.getAction());
671           return;
672         }
673         synchronized (replicaResultLock) {
674           // Don't run replica calls if the original has finished. We could do it e.g. if
675           // original has already failed before first replica call (unlikely given retries),
676           // but that would require additional synchronization w.r.t. returning to caller.
677           if (results[index] != null) return;
678           // We set the number of calls here. After that any path must call setResult/setError.
679           // True even for replicas that are not found - if we refuse to send we MUST set error.
680           results[index] = new ReplicaResultState(locs.length);
681         }
682         for (int i = 1; i < locs.length; ++i) {
683           Action<Row> replicaAction = new Action<Row>(action, i);
684           if (locs[i] != null) {
685             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
686                 replicaAction, actionsByServer, nonceGroup);
687           } else {
688             unknownReplicaActions.add(replicaAction);
689           }
690         }
691       }
692 
693       private void addReplicaActionsAgain(
694           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
695         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
696           throw new AssertionError("Cannot have default replica here");
697         }
698         HRegionLocation loc = getReplicaLocationOrFail(action);
699         if (loc == null) return;
700         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
701             action, actionsByServer, nonceGroup);
702       }
703     }
704 
705     /**
706      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
707      * single server. The server call is synchronous, therefore we do it on a thread pool.
708      */
709     private final class SingleServerRequestRunnable implements Runnable {
710       private final MultiAction<Row> multiAction;
711       private final int numAttempt;
712       private final ServerName server;
713       private final Set<MultiServerCallable<Row>> callsInProgress;
714 
715       private SingleServerRequestRunnable(
716           MultiAction<Row> multiAction, int numAttempt, ServerName server,
717           Set<MultiServerCallable<Row>> callsInProgress) {
718         this.multiAction = multiAction;
719         this.numAttempt = numAttempt;
720         this.server = server;
721         this.callsInProgress = callsInProgress;
722       }
723 
724       @Override
725       public void run() {
726         MultiResponse res;
727         MultiServerCallable<Row> callable = null;
728         try {
729           callable = createCallable(server, tableName, multiAction);
730           try {
731             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
732             if (callsInProgress != null) callsInProgress.add(callable);
733             res = caller.callWithoutRetries(callable, timeout);
734 
735             if (res == null) {
736               // Cancelled
737               return;
738             }
739 
740           } catch (IOException e) {
741             // The service itself failed . It may be an error coming from the communication
742             //   layer, but, as well, a functional error raised by the server.
743             receiveGlobalFailure(multiAction, server, numAttempt, e);
744             return;
745           } catch (Throwable t) {
746             // This should not happen. Let's log & retry anyway.
747             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
748                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
749             receiveGlobalFailure(multiAction, server, numAttempt, t);
750             return;
751           }
752 
753           // Normal case: we received an answer from the server, and it's not an exception.
754           receiveMultiAction(multiAction, server, res, numAttempt);
755         } catch (Throwable t) {
756           // Something really bad happened. We are on the send thread that will now die.
757           LOG.error("Internal AsyncProcess #" + id + " error for "
758               + tableName + " processing for " + server, t);
759           throw new RuntimeException(t);
760         } finally {
761           decTaskCounters(multiAction.getRegions(), server);
762           if (callsInProgress != null && callable != null) {
763             callsInProgress.remove(callable);
764           }
765         }
766       }
767     }
768 
769     private final Batch.Callback<CResult> callback;
770     private final BatchErrors errors;
771     private final ConnectionManager.ServerErrorTracker errorsByServer;
772     private final ExecutorService pool;
773     private final Set<MultiServerCallable<Row>> callsInProgress;
774 
775 
776     private final TableName tableName;
777     private final AtomicLong actionsInProgress = new AtomicLong(-1);
778     /**
779      * The lock controls access to results. It is only held when populating results where
780      * there might be several callers (eventual consistency gets). For other requests,
781      * there's one unique call going on per result index.
782      */
783     private final Object replicaResultLock = new Object();
784     /**
785      * Result array.  Null if results are not needed. Otherwise, each index corresponds to
786      * the action index in initial actions submitted. For most request types, has null-s for
787      * requests that are not done, and result/exception for those that are done.
788      * For eventual-consistency gets, initially the same applies; at some point, replica calls
789      * might be started, and ReplicaResultState is put at the corresponding indices. The
790      * returning calls check the type to detect when this is the case. After all calls are done,
791      * ReplicaResultState-s are replaced with results for the user.
792      */
793     private final Object[] results;
794     /**
795      * Indices of replica gets in results. If null, all or no actions are replica-gets.
796      */
797     private final int[] replicaGetIndices;
798     private final boolean hasAnyReplicaGets;
799     private final long nonceGroup;
800 
801     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
802         ExecutorService pool, boolean needResults, Object[] results,
803         Batch.Callback<CResult> callback) {
804       this.pool = pool;
805       this.callback = callback;
806       this.nonceGroup = nonceGroup;
807       this.tableName = tableName;
808       this.actionsInProgress.set(actions.size());
809       if (results != null) {
810         assert needResults;
811         if (results.length != actions.size()) {
812           throw new AssertionError("results.length");
813         }
814         this.results = results;
815         for (int i = 0; i != this.results.length; ++i) {
816           results[i] = null;
817         }
818       } else {
819         this.results = needResults ? new Object[actions.size()] : null;
820       }
821       List<Integer> replicaGetIndices = null;
822       boolean hasAnyReplicaGets = false;
823       if (needResults) {
824         // Check to see if any requests might require replica calls.
825         // We expect that many requests will consist of all or no multi-replica gets; in such
826         // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
827         // store the list of action indexes for which replica gets are possible, and set
828         // hasAnyReplicaGets to true.
829         boolean hasAnyNonReplicaReqs = false;
830         int posInList = 0;
831         for (Action<Row> action : actions) {
832           boolean isReplicaGet = isReplicaGet(action.getAction());
833           if (isReplicaGet) {
834             hasAnyReplicaGets = true;
835             if (hasAnyNonReplicaReqs) { // Mixed case
836               if (replicaGetIndices == null) {
837                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
838               }
839               replicaGetIndices.add(posInList);
840             }
841           } else if (!hasAnyNonReplicaReqs) {
842             // The first non-multi-replica request in the action list.
843             hasAnyNonReplicaReqs = true;
844             if (posInList > 0) {
845               // Add all the previous requests to the index lists. We know they are all
846               // replica-gets because this is the first non-multi-replica request in the list.
847               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
848               for (int i = 0; i < posInList; ++i) {
849                 replicaGetIndices.add(i);
850               }
851             }
852           }
853           ++posInList;
854         }
855       }
856       this.hasAnyReplicaGets = hasAnyReplicaGets;
857       if (replicaGetIndices != null) {
858         this.replicaGetIndices = new int[replicaGetIndices.size()];
859         int i = 0;
860         for (Integer el : replicaGetIndices) {
861           this.replicaGetIndices[i++] = el;
862         }
863       } else {
864         this.replicaGetIndices = null;
865       }
866       this.callsInProgress = !hasAnyReplicaGets ? null :
867           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
868 
869       this.errorsByServer = createServerErrorTracker();
870       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
871     }
872 
873     public Set<MultiServerCallable<Row>> getCallsInProgress() {
874       return callsInProgress;
875     }
876 
877     /**
878      * Group a list of actions per region servers, and send them.
879      *
880      * @param currentActions - the list of row to submit
881      * @param numAttempt - the current numAttempt (first attempt is 1)
882      */
883     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
884       Map<ServerName, MultiAction<Row>> actionsByServer =
885           new HashMap<ServerName, MultiAction<Row>>();
886 
887       boolean isReplica = false;
888       List<Action<Row>> unknownReplicaActions = null;
889       for (Action<Row> action : currentActions) {
890         RegionLocations locs = findAllLocationsOrFail(action, true);
891         if (locs == null) continue;
892         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
893         if (isReplica && !isReplicaAction) {
894           // This is the property of the current implementation, not a requirement.
895           throw new AssertionError("Replica and non-replica actions in the same retry");
896         }
897         isReplica = isReplicaAction;
898         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
899         if (loc == null || loc.getServerName() == null) {
900           if (isReplica) {
901             if (unknownReplicaActions == null) {
902               unknownReplicaActions = new ArrayList<Action<Row>>();
903             }
904             unknownReplicaActions.add(action);
905           } else {
906             // TODO: relies on primary location always being fetched
907             manageLocationError(action, null);
908           }
909         } else {
910           byte[] regionName = loc.getRegionInfo().getRegionName();
911           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
912         }
913       }
914       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
915       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
916 
917       if (!actionsByServer.isEmpty()) {
918         // If this is a first attempt to group and send, no replicas, we need replica thread.
919         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
920             ? currentActions : null, numAttempt > 1 && !hasUnknown);
921       }
922 
923       if (hasUnknown) {
924         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
925         for (Action<Row> action : unknownReplicaActions) {
926           HRegionLocation loc = getReplicaLocationOrFail(action);
927           if (loc == null) continue;
928           byte[] regionName = loc.getRegionInfo().getRegionName();
929           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
930         }
931         if (!actionsByServer.isEmpty()) {
932           sendMultiAction(
933               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
934         }
935       }
936     }
937 
938     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
939       // We are going to try get location once again. For each action, we'll do it once
940       // from cache, because the previous calls in the loop might populate it.
941       int replicaId = action.getReplicaId();
942       RegionLocations locs = findAllLocationsOrFail(action, true);
943       if (locs == null) return null; // manageError already called
944       HRegionLocation loc = locs.getRegionLocation(replicaId);
945       if (loc == null || loc.getServerName() == null) {
946         locs = findAllLocationsOrFail(action, false);
947         if (locs == null) return null; // manageError already called
948         loc = locs.getRegionLocation(replicaId);
949       }
950       if (loc == null || loc.getServerName() == null) {
951         manageLocationError(action, null);
952         return null;
953       }
954       return loc;
955     }
956 
957     private void manageLocationError(Action<Row> action, Exception ex) {
958       String msg = "Cannot get replica " + action.getReplicaId()
959           + " location for " + action.getAction();
960       LOG.error(msg);
961       if (ex == null) {
962         ex = new IOException(msg);
963       }
964       manageError(action.getOriginalIndex(), action.getAction(),
965           Retry.NO_LOCATION_PROBLEM, ex, null);
966     }
967 
968     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
969       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
970           ", row cannot be null");
971       RegionLocations loc = null;
972       try {
973         loc = connection.locateRegion(
974             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
975       } catch (IOException ex) {
976         manageLocationError(action, ex);
977       }
978       return loc;
979     }
980 
981     /**
982      * Send a multi action structure to the servers, after a delay depending on the attempt
983      * number. Asynchronous.
984      *
985      * @param actionsByServer the actions structured by regions
986      * @param numAttempt the attempt number.
987      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
988      */
989     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
990         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
991       // Run the last item on the same thread if we are already on a send thread.
992       // We hope most of the time it will be the only item, so we can cut down on threads.
993       int actionsRemaining = actionsByServer.size();
994       // This iteration is by server (the HRegionLocation comparator is by server portion only).
995       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
996         ServerName server = e.getKey();
997         MultiAction<Row> multiAction = e.getValue();
998         incTaskCounters(multiAction.getRegions(), server);
999         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1000             numAttempt);
1001         // make sure we correctly count the number of runnables before we try to reuse the send
1002         // thread, in case we had to split the request into different runnables because of backoff
1003         if (runnables.size() > actionsRemaining) {
1004           actionsRemaining = runnables.size();
1005         }
1006 
1007         // run all the runnables
1008         for (Runnable runnable : runnables) {
1009           if ((--actionsRemaining == 0) && reuseThread) {
1010             runnable.run();
1011           } else {
1012             try {
1013               pool.submit(runnable);
1014             } catch (Throwable t) {
1015               if (t instanceof RejectedExecutionException) {
1016                 // This should never happen. But as the pool is provided by the end user,
1017                // let's secure this a little.
1018                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1019                   " Server is " + server.getServerName(), t);
1020               } else {
1021                 // see #HBASE-14359 for more details
1022                 LOG.warn("Caught unexpected exception/error: ", t);
1023               }
1024               decTaskCounters(multiAction.getRegions(), server);
1025               // We're likely to fail again, but this will increment the attempt counter,
1026              // so it will finish.
1027               receiveGlobalFailure(multiAction, server, numAttempt, t);
1028             }
1029           }
1030         }
1031       }
1032 
1033       if (actionsForReplicaThread != null) {
1034         startWaitingForReplicaCalls(actionsForReplicaThread);
1035       }
1036     }
1037 
1038     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1039         MultiAction<Row> multiAction,
1040         int numAttempt) {
1041       // no stats to manage, just do the standard action
1042       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1043         if (connection.getConnectionMetrics() != null) {
1044           connection.getConnectionMetrics().incrNormalRunners();
1045         }
1046         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1047             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1048       }
1049 
1050       // group the actions by the amount of delay
1051       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1052           .size());
1053 
1054       // split up the actions
1055       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1056         Long backoff = getBackoff(server, e.getKey());
1057         DelayingRunner runner = actions.get(backoff);
1058         if (runner == null) {
1059           actions.put(backoff, new DelayingRunner(backoff, e));
1060         } else {
1061           runner.add(e);
1062         }
1063       }
1064 
1065       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1066       for (DelayingRunner runner : actions.values()) {
1067         String traceText = "AsyncProcess.sendMultiAction";
1068         Runnable runnable =
1069             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1070                 callsInProgress);
1071         // use a delay runner only if we need to sleep for some time
1072         if (runner.getSleepTime() > 0) {
1073           runner.setRunner(runnable);
1074           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1075           runnable = runner;
1076           if (connection.getConnectionMetrics() != null) {
1077             connection.getConnectionMetrics().incrDelayRunners();
1078             connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1079           }
1080         } else {
1081           if (connection.getConnectionMetrics() != null) {
1082             connection.getConnectionMetrics().incrNormalRunners();
1083           }
1084         }
1085         runnable = Trace.wrap(traceText, runnable);
1086         toReturn.add(runnable);
1087 
1088       }
1089       return toReturn;
1090     }
1091 
1092     /**
1093      * @param server server location where the target region is hosted
1094      * @param regionName name of the region which we are going to write some data
1095      * @return the amount of time the client should wait until it submit a request to the
1096      * specified server and region
1097      */
1098     private Long getBackoff(ServerName server, byte[] regionName) {
1099       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1100       ServerStatistics stats = tracker.getStats(server);
1101       return AsyncProcess.this.connection.getBackoffPolicy()
1102           .getBackoffTime(server, regionName, stats);
1103     }
1104 
1105     /**
1106      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
1107      */
1108     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1109       long startTime = EnvironmentEdgeManager.currentTime();
1110       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1111           actionsForReplicaThread, startTime);
1112       if (primaryCallTimeoutMicroseconds == 0) {
1113         // Start replica calls immediately.
1114         replicaRunnable.run();
1115       } else {
1116         // Start the thread that may kick off replica gets.
1117         // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
1118         try {
1119           pool.submit(replicaRunnable);
1120         } catch (RejectedExecutionException ree) {
1121           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1122         }
1123       }
1124     }
1125 
1126     /**
1127      * Check that we can retry acts accordingly: logs, set the error status.
1128      *
1129      * @param originalIndex the position in the list sent
1130      * @param row           the row
1131      * @param canRetry      if false, we won't retry whatever the settings.
1132      * @param throwable     the throwable, if any (can be null)
1133      * @param server        the location, if any (can be null)
1134      * @return true if the action can be retried, false otherwise.
1135      */
1136     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1137                                 Throwable throwable, ServerName server) {
1138       if (canRetry == Retry.YES
1139           && throwable != null && (throwable instanceof DoNotRetryIOException ||
1140           throwable instanceof NeedUnmanagedConnectionException)) {
1141         canRetry = Retry.NO_NOT_RETRIABLE;
1142       }
1143 
1144       if (canRetry != Retry.YES) {
1145         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
1146         setError(originalIndex, row, throwable, server);
1147       } else if (isActionComplete(originalIndex, row)) {
1148         canRetry = Retry.NO_OTHER_SUCCEEDED;
1149       }
1150       return canRetry;
1151     }
1152 
1153     /**
1154      * Resubmit all the actions from this multiaction after a failure.
1155      *
1156      * @param rsActions  the actions still to do from the initial list
1157      * @param server   the destination
1158      * @param numAttempt the number of attempts so far
1159      * @param t the throwable (if any) that caused the resubmit
1160      */
1161     private void receiveGlobalFailure(
1162         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1163       errorsByServer.reportServerError(server);
1164       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1165           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1166 
1167       if (tableName == null) {
1168         // tableName is null when we made a cross-table RPC call.
1169         connection.clearCaches(server);
1170       }
1171       int failed = 0, stopped = 0;
1172       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1173       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1174         byte[] regionName = e.getKey();
1175         byte[] row = e.getValue().iterator().next().getAction().getRow();
1176         // Do not use the exception for updating cache because it might be coming from
1177         // any of the regions in the MultiAction.
1178         if (tableName != null) {
1179           connection.updateCachedLocations(tableName, regionName, row,
1180             ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1181         }
1182         for (Action<Row> action : e.getValue()) {
1183           Retry retry = manageError(
1184               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1185           if (retry == Retry.YES) {
1186             toReplay.add(action);
1187           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1188             ++stopped;
1189           } else {
1190             ++failed;
1191           }
1192         }
1193       }
1194 
1195       if (toReplay.isEmpty()) {
1196         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1197       } else {
1198         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1199       }
1200     }
1201 
1202     /**
1203      * Log as much info as possible, and, if there is something to replay,
1204      * submit it again after a back off sleep.
1205      */
1206     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1207         int numAttempt, int failureCount, Throwable throwable) {
1208       // We have something to replay. We're going to sleep a little before.
1209 
1210       // We have two contradicting needs here:
1211       //  1) We want to get the new location after having slept, as it may change.
1212       //  2) We want to take into account the location when calculating the sleep time.
1213       //  3) If all this is just because the response needed to be chunked try again FAST.
1214       // It should be possible to have some heuristics to take the right decision. Short term,
1215       //  we go for one.
1216       boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1217       int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1218       long backOffTime = retryImmediately ? 0 :
1219           errorsByServer.calculateBackoffTime(oldServer, pause);
1220       if (numAttempt > startLogErrorsCnt) {
1221         // We use this value to have some logs when we have multiple failures, but not too many
1222         //  logs, as errors are to be expected when a region moves, splits and so on
1223         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1224             oldServer, throwable, backOffTime, true, null, -1, -1));
1225       }
1226 
1227       try {
1228         if (backOffTime > 0) {
1229           Thread.sleep(backOffTime);
1230         }
1231       } catch (InterruptedException e) {
1232         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1233         Thread.currentThread().interrupt();
1234         return;
1235       }
1236 
1237       groupAndSendMultiAction(toReplay, nextAttemptNumber);
1238     }
1239 
1240     private void logNoResubmit(ServerName oldServer, int numAttempt,
1241         int failureCount, Throwable throwable, int failed, int stopped) {
1242       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1243         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1244         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1245             throwable, -1, false, timeStr, failed, stopped);
1246         if (failed != 0) {
1247           // Only log final failures as warning
1248           LOG.warn(logMessage);
1249         } else {
1250           LOG.info(logMessage);
1251         }
1252       }
1253     }
1254 
1255     /**
1256      * Called when we receive the result of a server query.
1257      *
1258      * @param multiAction    - the multiAction we sent
1259      * @param server       - the location. It's used as a server name.
1260      * @param responses      - the response, if any
1261      * @param numAttempt     - the attempt
1262      */
1263     private void receiveMultiAction(MultiAction<Row> multiAction,
1264         ServerName server, MultiResponse responses, int numAttempt) {
1265        assert responses != null;
1266 
1267       // Success or partial success
1268       // Analyze detailed results. We can still have individual failures to be redo.
1269       // two specific throwables are managed:
1270       //  - DoNotRetryIOException: we continue to retry for other actions
1271       //  - RegionMovedException: we update the cache with the new region location
1272 
1273       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1274       Throwable throwable = null;
1275       int failureCount = 0;
1276       boolean canRetry = true;
1277 
1278       // Go by original action.
1279       int failed = 0, stopped = 0;
1280       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1281         byte[] regionName = regionEntry.getKey();
1282         Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1283         if (regionResults == null) {
1284           if (!responses.getExceptions().containsKey(regionName)) {
1285             LOG.error("Server sent us neither results nor exceptions for "
1286                 + Bytes.toStringBinary(regionName));
1287             responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1288           }
1289           continue;
1290         }
1291         boolean regionFailureRegistered = false;
1292         for (Action<Row> sentAction : regionEntry.getValue()) {
1293           Object result = regionResults.get(sentAction.getOriginalIndex());
1294           // Failure: retry if it's make sense else update the errors lists
1295           if (result == null || result instanceof Throwable) {
1296             Row row = sentAction.getAction();
1297             throwable = ClientExceptionsUtil.findException(result);
1298             // Register corresponding failures once per server/once per region.
1299             if (!regionFailureRegistered) {
1300               regionFailureRegistered = true;
1301               connection.updateCachedLocations(
1302                   tableName, regionName, row.getRow(), result, server);
1303             }
1304             if (failureCount == 0) {
1305               errorsByServer.reportServerError(server);
1306               // We determine canRetry only once for all calls, after reporting server failure.
1307               canRetry = errorsByServer.canRetryMore(numAttempt);
1308             }
1309             ++failureCount;
1310             Retry retry = manageError(sentAction.getOriginalIndex(), row,
1311                 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1312             if (retry == Retry.YES) {
1313               toReplay.add(sentAction);
1314             } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1315               ++stopped;
1316             } else {
1317               ++failed;
1318             }
1319           } else {
1320             
1321             if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1322               AsyncProcess.this.connection.getConnectionMetrics().
1323                       updateServerStats(server, regionName, result);
1324             }
1325 
1326             // update the stats about the region, if its a user table. We don't want to slow down
1327             // updates to meta tables, especially from internal updates (master, etc).
1328             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1329               result = ResultStatsUtil.updateStats(result,
1330                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1331             }
1332 
1333             if (callback != null) {
1334               try {
1335                 //noinspection unchecked
1336                 // TODO: would callback expect a replica region name if it gets one?
1337                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1338               } catch (Throwable t) {
1339                 LOG.error("User callback threw an exception for "
1340                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1341               }
1342             }
1343             setResult(sentAction, result);
1344           }
1345         }
1346       }
1347 
1348       // The failures global to a region. We will use for multiAction we sent previously to find the
1349       //   actions to replay.
1350       for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1351         throwable = throwableEntry.getValue();
1352         byte[] region = throwableEntry.getKey();
1353         List<Action<Row>> actions = multiAction.actions.get(region);
1354         if (actions == null || actions.isEmpty()) {
1355           throw new IllegalStateException("Wrong response for the region: " +
1356               HRegionInfo.encodeRegionName(region));
1357         }
1358 
1359         if (failureCount == 0) {
1360           errorsByServer.reportServerError(server);
1361           canRetry = errorsByServer.canRetryMore(numAttempt);
1362         }
1363         if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
1364           // For multi-actions, we don't have a table name, but we want to make sure to clear the
1365           // cache in case there were location-related exceptions. We don't to clear the cache
1366           // for every possible exception that comes through, however.
1367           connection.clearCaches(server);
1368         } else {
1369           connection.updateCachedLocations(
1370               tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1371         }
1372         failureCount += actions.size();
1373 
1374         for (Action<Row> action : actions) {
1375           Row row = action.getAction();
1376           Retry retry = manageError(action.getOriginalIndex(), row,
1377               canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1378           if (retry == Retry.YES) {
1379             toReplay.add(action);
1380           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1381             ++stopped;
1382           } else {
1383             ++failed;
1384           }
1385         }
1386       }
1387 
1388       if (toReplay.isEmpty()) {
1389         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1390       } else {
1391         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1392       }
1393     }
1394 
1395     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1396         Throwable error, long backOffTime, boolean willRetry, String startTime,
1397         int failed, int stopped) {
1398       StringBuilder sb = new StringBuilder();
1399       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1400         .append("attempt=").append(numAttempt)
1401         .append("/").append(numTries).append(" ");
1402 
1403       if (failureCount > 0 || error != null){
1404         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1405             append(error == null ? "null" : error);
1406       } else {
1407         sb.append("succeeded");
1408       }
1409 
1410       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1411 
1412       if (willRetry) {
1413         sb.append(", retrying after=").append(backOffTime).append("ms").
1414             append(", replay=").append(replaySize).append("ops");
1415       } else if (failureCount > 0) {
1416         if (stopped > 0) {
1417           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1418         }
1419         if (failed > 0) {
1420           sb.append("; not retrying ").append(failed).append(" - final failure");
1421         }
1422 
1423       }
1424 
1425       return sb.toString();
1426     }
1427 
1428     /**
1429      * Sets the non-error result from a particular action.
1430      * @param action Action (request) that the server responded to.
1431      * @param result The result.
1432      */
1433     private void setResult(Action<Row> action, Object result) {
1434       if (result == null) {
1435         throw new RuntimeException("Result cannot be null");
1436       }
1437       ReplicaResultState state = null;
1438       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1439       int index = action.getOriginalIndex();
1440       if (results == null) {
1441          decActionCounter(index);
1442          return; // Simple case, no replica requests.
1443       } else if ((state = trySetResultSimple(
1444           index, action.getAction(), false, result, null, isStale)) == null) {
1445         return; // Simple case, no replica requests.
1446       }
1447       assert state != null;
1448       // At this point we know that state is set to replica tracking class.
1449       // It could be that someone else is also looking at it; however, we know there can
1450       // only be one state object, and only one thread can set callCount to 0. Other threads
1451       // will either see state with callCount 0 after locking it; or will not see state at all
1452       // we will replace it with the result.
1453       synchronized (state) {
1454         if (state.callCount == 0) {
1455           return; // someone already set the result
1456         }
1457         state.callCount = 0;
1458       }
1459       synchronized (replicaResultLock) {
1460         if (results[index] != state) {
1461           throw new AssertionError("We set the callCount but someone else replaced the result");
1462         }
1463         results[index] = result;
1464       }
1465 
1466       decActionCounter(index);
1467     }
1468 
1469     /**
1470      * Sets the error from a particular action.
1471      * @param index Original action index.
1472      * @param row Original request.
1473      * @param throwable The resulting error.
1474      * @param server The source server.
1475      */
1476     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1477       ReplicaResultState state = null;
1478       if (results == null) {
1479         // Note that we currently cannot have replica requests with null results. So it shouldn't
1480         // happen that multiple replica calls will call dAC for same actions with results == null.
1481         // Only one call per action should be present in this case.
1482         errors.add(throwable, row, server);
1483         decActionCounter(index);
1484         return; // Simple case, no replica requests.
1485       } else if ((state = trySetResultSimple(
1486           index, row, true, throwable, server, false)) == null) {
1487         return; // Simple case, no replica requests.
1488       }
1489       assert state != null;
1490       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1491       boolean isActionDone = false;
1492       synchronized (state) {
1493         switch (state.callCount) {
1494           case 0: return; // someone already set the result
1495           case 1: { // All calls failed, we are the last error.
1496             target = errors;
1497             isActionDone = true;
1498             break;
1499           }
1500           default: {
1501             assert state.callCount > 1;
1502             if (state.replicaErrors == null) {
1503               state.replicaErrors = new BatchErrors();
1504             }
1505             target = state.replicaErrors;
1506             break;
1507           }
1508         }
1509         --state.callCount;
1510       }
1511       target.add(throwable, row, server);
1512       if (isActionDone) {
1513         if (state.replicaErrors != null) { // last call, no need to lock
1514           errors.merge(state.replicaErrors);
1515         }
1516         // See setResult for explanations.
1517         synchronized (replicaResultLock) {
1518           if (results[index] != state) {
1519             throw new AssertionError("We set the callCount but someone else replaced the result");
1520           }
1521           results[index] = throwable;
1522         }
1523         decActionCounter(index);
1524       }
1525     }
1526 
1527     /**
1528      * Checks if the action is complete; used on error to prevent needless retries.
1529      * Does not synchronize, assuming element index/field accesses are atomic.
1530      * This is an opportunistic optimization check, doesn't have to be strict.
1531      * @param index Original action index.
1532      * @param row Original request.
1533      */
1534     private boolean isActionComplete(int index, Row row) {
1535       if (!isReplicaGet(row)) return false;
1536       Object resObj = results[index];
1537       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1538           || ((ReplicaResultState)resObj).callCount == 0);
1539     }
1540 
1541     /**
1542      * Tries to set the result or error for a particular action as if there were no replica calls.
1543      * @return null if successful; replica state if there were in fact replica calls.
1544      */
1545     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1546         Object result, ServerName server, boolean isFromReplica) {
1547       Object resObj = null;
1548       if (!isReplicaGet(row)) {
1549         if (isFromReplica) {
1550           throw new AssertionError("Unexpected stale result for " + row);
1551         }
1552         results[index] = result;
1553       } else {
1554         synchronized (replicaResultLock) {
1555           if ((resObj = results[index]) == null) {
1556             if (isFromReplica) {
1557               throw new AssertionError("Unexpected stale result for " + row);
1558             }
1559             results[index] = result;
1560           }
1561         }
1562       }
1563 
1564       ReplicaResultState rrs =
1565           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1566       if (rrs == null && isError) {
1567         // The resObj is not replica state (null or already set).
1568         errors.add((Throwable)result, row, server);
1569       }
1570 
1571       if (resObj == null) {
1572         // resObj is null - no replica calls were made.
1573         decActionCounter(index);
1574         return null;
1575       }
1576       return rrs;
1577     }
1578 
1579     private void decActionCounter(int index) {
1580       long actionsRemaining = actionsInProgress.decrementAndGet();
1581       if (actionsRemaining < 0) {
1582         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1583         throw new AssertionError(error);
1584       } else if (actionsRemaining == 0) {
1585         synchronized (actionsInProgress) {
1586           actionsInProgress.notifyAll();
1587         }
1588       }
1589     }
1590 
1591     private String buildDetailedErrorMsg(String string, int index) {
1592       StringBuilder error = new StringBuilder(string);
1593       error.append("; called for ").
1594         append(index).
1595         append(", actionsInProgress ").
1596         append(actionsInProgress.get()).
1597         append("; replica gets: ");
1598       if (replicaGetIndices != null) {
1599         for (int i = 0; i < replicaGetIndices.length; ++i) {
1600           error.append(replicaGetIndices[i]).append(", ");
1601         }
1602       } else {
1603         error.append(hasAnyReplicaGets ? "all" : "none");
1604       }
1605       error.append("; results ");
1606       if (results != null) {
1607         for (int i = 0; i < results.length; ++i) {
1608           Object o = results[i];
1609           error.append(((o == null) ? "null" : o.toString())).append(", ");
1610         }
1611       }
1612       return error.toString();
1613     }
1614 
1615     @Override
1616     public void waitUntilDone() throws InterruptedIOException {
1617       try {
1618         waitUntilDone(Long.MAX_VALUE);
1619       } catch (InterruptedException iex) {
1620         throw new InterruptedIOException(iex.getMessage());
1621       } finally {
1622         if (callsInProgress != null) {
1623           for (MultiServerCallable<Row> clb : callsInProgress) {
1624             clb.cancel();
1625           }
1626         }
1627       }
1628     }
1629 
1630     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1631       boolean hasWait = cutoff != Long.MAX_VALUE;
1632       long lastLog = EnvironmentEdgeManager.currentTime();
1633       long currentInProgress;
1634       while (0 != (currentInProgress = actionsInProgress.get())) {
1635         long now = EnvironmentEdgeManager.currentTime();
1636         if (hasWait && (now * 1000L) > cutoff) {
1637           return false;
1638         }
1639         if (!hasWait) { // Only log if wait is infinite.
1640           if (now > lastLog + 10000) {
1641             lastLog = now;
1642             LOG.info("#" + id + ", waiting for " + currentInProgress + "  actions to finish");
1643           }
1644         }
1645         synchronized (actionsInProgress) {
1646           if (actionsInProgress.get() == 0) break;
1647           if (!hasWait) {
1648             actionsInProgress.wait(100);
1649           } else {
1650             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1651             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1652           }
1653         }
1654       }
1655       return true;
1656     }
1657 
1658     @Override
1659     public boolean hasError() {
1660       return errors.hasErrors();
1661     }
1662 
1663     @Override
1664     public List<? extends Row> getFailedOperations() {
1665       return errors.actions;
1666     }
1667 
1668     @Override
1669     public RetriesExhaustedWithDetailsException getErrors() {
1670       return errors.makeException();
1671     }
1672 
1673     @Override
1674     public Object[] getResults() throws InterruptedIOException {
1675       waitUntilDone();
1676       return results;
1677     }
1678   }
1679 
1680   @VisibleForTesting
1681   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
1682   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1683       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1684       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1685     return new AsyncRequestFutureImpl<CResult>(
1686         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1687   }
1688 
1689   /**
1690    * Create a callable. Isolated to be easily overridden in the tests.
1691    */
1692   @VisibleForTesting
1693   protected MultiServerCallable<Row> createCallable(final ServerName server,
1694       TableName tableName, final MultiAction<Row> multi) {
1695     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1696   }
1697 
1698   /**
1699    * Create a caller. Isolated to be easily overridden in the tests.
1700    */
1701   @VisibleForTesting
1702   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1703     return rpcCallerFactory.<MultiResponse> newCaller();
1704   }
1705 
1706   @VisibleForTesting
1707   /** Waits until all outstanding tasks are done. Used in tests. */
1708   void waitUntilDone() throws InterruptedIOException {
1709     waitForMaximumCurrentTasks(0);
1710   }
1711 
1712   /** Wait until the async does not have more than max tasks in progress. */
1713   private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
1714     long lastLog = EnvironmentEdgeManager.currentTime();
1715     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1716     while ((currentInProgress = this.tasksInProgress.get()) > max) {
1717       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
1718         long now = EnvironmentEdgeManager.currentTime();
1719         if (now > lastLog + 10000) {
1720           lastLog = now;
1721           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1722               + max + ", tasksInProgress=" + currentInProgress);
1723         }
1724       }
1725       oldInProgress = currentInProgress;
1726       try {
1727         synchronized (this.tasksInProgress) {
1728           if (tasksInProgress.get() != oldInProgress) break;
1729           this.tasksInProgress.wait(100);
1730         }
1731       } catch (InterruptedException e) {
1732         throw new InterruptedIOException("#" + id + ", interrupted." +
1733             " currentNumberOfTask=" + currentInProgress);
1734       }
1735     }
1736   }
1737 
1738   /**
1739    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1740    * @return Whether there were any errors in any request since the last time
1741    *          {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created.
1742    */
1743   public boolean hasError() {
1744     return globalErrors.hasErrors();
1745   }
1746 
1747   /**
1748    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1749    * Waits for all previous operations to finish, and returns errors and (optionally)
1750    * failed operations themselves.
1751    * @param failedRows an optional list into which the rows that failed since the last time
1752    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
1753    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
1754    *          was called, or AP was created.
1755    */
1756   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1757       List<Row> failedRows) throws InterruptedIOException {
1758     waitForMaximumCurrentTasks(0);
1759     if (!globalErrors.hasErrors()) {
1760       return null;
1761     }
1762     if (failedRows != null) {
1763       failedRows.addAll(globalErrors.actions);
1764     }
1765     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1766     globalErrors.clear();
1767     return result;
1768   }
1769 
1770   /**
1771    * increment the tasks counters for a given set of regions. MT safe.
1772    */
1773   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1774     tasksInProgress.incrementAndGet();
1775 
1776     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1777     if (serverCnt == null) {
1778       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1779       serverCnt = taskCounterPerServer.get(sn);
1780     }
1781     serverCnt.incrementAndGet();
1782 
1783     for (byte[] regBytes : regions) {
1784       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1785       if (regionCnt == null) {
1786         regionCnt = new AtomicInteger();
1787         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1788         if (oldCnt != null) {
1789           regionCnt = oldCnt;
1790         }
1791       }
1792       regionCnt.incrementAndGet();
1793     }
1794   }
1795 
1796   /**
1797    * Decrements the counters for a given region and the region server. MT Safe.
1798    */
1799   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1800     for (byte[] regBytes : regions) {
1801       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1802       regionCnt.decrementAndGet();
1803     }
1804 
1805     taskCounterPerServer.get(sn).decrementAndGet();
1806     tasksInProgress.decrementAndGet();
1807     synchronized (tasksInProgress) {
1808       tasksInProgress.notifyAll();
1809     }
1810   }
1811 
1812   /**
1813    * Creates the server error tracker to use inside process.
1814    * Currently, to preserve the main assumption about current retries, and to work well with
1815    * the retry-limit-based calculation, the calculation is local per Process object.
1816    * We may benefit from connection-wide tracking of server errors.
1817    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1818    */
1819   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1820     return new ConnectionManager.ServerErrorTracker(
1821         this.serverTrackerTimeout, this.numTries);
1822   }
1823 
1824   private static boolean isReplicaGet(Row row) {
1825     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1826   }
1827 
1828   /**
1829    * For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
1830    */
1831   private enum Retry {
1832     YES,
1833     NO_LOCATION_PROBLEM,
1834     NO_NOT_RETRIABLE,
1835     NO_RETRIES_EXHAUSTED,
1836     NO_OTHER_SUCCEEDED
1837   }
1838 }