1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.ConcurrentSkipListMap;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.DoNotRetryIOException;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.hbase.util.Pair;
53 import org.cloudera.htrace.Trace;
54
55 import com.google.common.base.Preconditions;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 class AsyncProcess<CResult> {
94 private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
95 protected static final AtomicLong COUNTER = new AtomicLong();
96 protected final long id;
97 private final int startLogErrorsCnt;
98 protected final HConnection hConnection;
99 protected final TableName tableName;
100 protected final ExecutorService pool;
101 protected final AsyncProcessCallback<CResult> callback;
102 protected final BatchErrors errors = new BatchErrors();
103 protected final AtomicBoolean hasError = new AtomicBoolean(false);
104 protected final AtomicLong tasksSent = new AtomicLong(0);
105 protected final AtomicLong tasksDone = new AtomicLong(0);
106 protected final AtomicLong retriesCnt = new AtomicLong(0);
107 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
108 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
109 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
110 new ConcurrentHashMap<ServerName, AtomicInteger>();
111 protected final int timeout;
112
113
114
115
116 protected final int maxTotalConcurrentTasks;
117
118
119
120
121
122
123
124 protected final int maxConcurrentTasksPerRegion;
125
126
127
128
129 protected final int maxConcurrentTasksPerServer;
130 protected final long pause;
131 protected int numTries;
132 protected int serverTrackerTimeout;
133 protected RpcRetryingCallerFactory rpcCallerFactory;
134 private RpcControllerFactory rpcFactory;
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 interface AsyncProcessCallback<CResult> {
151
152
153
154
155 void success(int originalIndex, byte[] region, Row row, CResult result);
156
157
158
159
160
161
162
163
164
165 boolean failure(int originalIndex, byte[] region, Row row, Throwable t);
166
167
168
169
170
171
172
173 boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception);
174 }
175
176 private static class BatchErrors {
177 private final List<Throwable> throwables = new ArrayList<Throwable>();
178 private final List<Row> actions = new ArrayList<Row>();
179 private final List<String> addresses = new ArrayList<String>();
180
181 public synchronized void add(Throwable ex, Row row, HRegionLocation location) {
182 if (row == null){
183 throw new IllegalArgumentException("row cannot be null. location=" + location);
184 }
185
186 throwables.add(ex);
187 actions.add(row);
188 addresses.add(location != null ? location.getServerName().toString() : "null location");
189 }
190
191 private synchronized RetriesExhaustedWithDetailsException makeException() {
192 return new RetriesExhaustedWithDetailsException(
193 new ArrayList<Throwable>(throwables),
194 new ArrayList<Row>(actions), new ArrayList<String>(addresses));
195 }
196
197 public synchronized void clear() {
198 throwables.clear();
199 actions.clear();
200 addresses.clear();
201 }
202 }
203
204 public AsyncProcess(HConnection hc, TableName tableName, ExecutorService pool,
205 AsyncProcessCallback<CResult> callback, Configuration conf,
206 RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
207 if (hc == null){
208 throw new IllegalArgumentException("HConnection cannot be null.");
209 }
210
211 this.hConnection = hc;
212 this.tableName = tableName;
213 this.pool = pool;
214 this.callback = callback;
215
216 this.id = COUNTER.incrementAndGet();
217
218 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
219 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
220 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
221 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
222 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
223 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
224
225
226 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
227 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
228 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
229 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
230 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
231 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
232
233
234
235
236 this.startLogErrorsCnt = conf.getInt("hbase.client.start.log.errors.counter", 9);
237
238 if (this.maxTotalConcurrentTasks <= 0) {
239 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
240 }
241 if (this.maxConcurrentTasksPerServer <= 0) {
242 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
243 maxConcurrentTasksPerServer);
244 }
245 if (this.maxConcurrentTasksPerRegion <= 0) {
246 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
247 maxConcurrentTasksPerRegion);
248 }
249
250
251
252
253
254
255
256
257 this.serverTrackerTimeout = 0;
258 for (int i = 0; i < this.numTries; ++i) {
259 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
260 }
261
262 this.rpcCallerFactory = rpcCaller;
263 Preconditions.checkNotNull(rpcFactory);
264 this.rpcFactory = rpcFactory;
265 }
266
267
268
269
270
271
272
273
274 public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
275 if (rows.isEmpty()) {
276 return;
277 }
278
279
280
281 Map<HRegionLocation, MultiAction<Row>> actionsByServer =
282 new HashMap<HRegionLocation, MultiAction<Row>>();
283 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
284
285 long currentTaskCnt = tasksDone.get();
286 boolean alreadyLooped = false;
287
288 NonceGenerator ng = this.hConnection.getNonceGenerator();
289 do {
290 if (alreadyLooped){
291
292 waitForNextTaskDone(currentTaskCnt);
293 currentTaskCnt = tasksDone.get();
294 } else {
295 alreadyLooped = true;
296 }
297
298
299 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
300
301
302
303 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
304 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
305
306 int posInList = -1;
307 Iterator<? extends Row> it = rows.iterator();
308 while (it.hasNext()) {
309 Row r = it.next();
310 HRegionLocation loc = findDestLocation(r, posInList);
311
312 if (loc == null) {
313 it.remove();
314 } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
315 Action<Row> action = new Action<Row>(r, ++posInList);
316 setNonce(ng, r, action);
317 retainedActions.add(action);
318 addAction(loc, action, actionsByServer, ng);
319 it.remove();
320 }
321 }
322 } while (retainedActions.isEmpty() && atLeastOne && !hasError());
323
324 HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
325 sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);
326 }
327
328
329
330
331
332
333
334
335
336 private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
337 MultiAction<Row>> actionsByServer, NonceGenerator ng) {
338 final byte[] regionName = loc.getRegionInfo().getRegionName();
339 MultiAction<Row> multiAction = actionsByServer.get(loc);
340 if (multiAction == null) {
341 multiAction = new MultiAction<Row>();
342 actionsByServer.put(loc, multiAction);
343 }
344 if (action.hasNonce() && !multiAction.hasNonceGroup()) {
345
346
347 multiAction.setNonceGroup(ng.getNonceGroup());
348 }
349
350 multiAction.add(regionName, action);
351 }
352
353
354
355
356
357
358
359
360 private HRegionLocation findDestLocation(Row row, int posInList) {
361 if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
362 HRegionLocation loc = null;
363 IOException locationException = null;
364 try {
365 loc = hConnection.locateRegion(this.tableName, row.getRow());
366 if (loc == null) {
367 locationException = new IOException("#" + id + ", no location found, aborting submit for" +
368 " tableName=" + tableName +
369 " rowkey=" + Arrays.toString(row.getRow()));
370 }
371 } catch (IOException e) {
372 locationException = e;
373 }
374 if (locationException != null) {
375
376
377 manageError(posInList, row, false, locationException, null);
378 return null;
379 }
380
381 return loc;
382 }
383
384
385
386
387
388
389
390
391
392 protected boolean canTakeOperation(HRegionLocation loc,
393 Map<Long, Boolean> regionsIncluded,
394 Map<ServerName, Boolean> serversIncluded) {
395 long regionId = loc.getRegionInfo().getRegionId();
396 Boolean regionPrevious = regionsIncluded.get(regionId);
397
398 if (regionPrevious != null) {
399
400 return regionPrevious;
401 }
402
403 Boolean serverPrevious = serversIncluded.get(loc.getServerName());
404 if (Boolean.FALSE.equals(serverPrevious)) {
405
406 regionsIncluded.put(regionId, Boolean.FALSE);
407 return false;
408 }
409
410 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
411 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
412
413 regionsIncluded.put(regionId, Boolean.FALSE);
414 return false;
415 }
416
417 if (serverPrevious == null) {
418
419 int newServers = 0;
420 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
421 if (kv.getValue()) {
422 newServers++;
423 }
424 }
425
426
427 boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;
428
429 if (ok) {
430
431 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
432 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
433 }
434
435 if (!ok) {
436 regionsIncluded.put(regionId, Boolean.FALSE);
437 serversIncluded.put(loc.getServerName(), Boolean.FALSE);
438 return false;
439 }
440
441 serversIncluded.put(loc.getServerName(), Boolean.TRUE);
442 } else {
443 assert serverPrevious.equals(Boolean.TRUE);
444 }
445
446 regionsIncluded.put(regionId, Boolean.TRUE);
447
448 return true;
449 }
450
451
452
453
454
455
456
457 public void submitAll(List<? extends Row> rows) {
458 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
459
460
461 int posInList = -1;
462 NonceGenerator ng = this.hConnection.getNonceGenerator();
463 for (Row r : rows) {
464 posInList++;
465 if (r instanceof Put) {
466 Put put = (Put) r;
467 if (put.isEmpty()) {
468 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
469 }
470 }
471 Action<Row> action = new Action<Row>(r, posInList);
472 setNonce(ng, r, action);
473 actions.add(action);
474 }
475 HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
476 submit(actions, actions, 1, errorsByServer);
477 }
478
479 private void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
480 if (!(r instanceof Append) && !(r instanceof Increment)) return;
481 action.setNonce(ng.newNonce());
482 }
483
484
485
486
487
488
489
490
491
492
493 private void submit(List<Action<Row>> initialActions,
494 List<Action<Row>> currentActions, int numAttempt,
495 final HConnectionManager.ServerErrorTracker errorsByServer) {
496
497 if (numAttempt > 1){
498 retriesCnt.incrementAndGet();
499 }
500
501
502 final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
503 new HashMap<HRegionLocation, MultiAction<Row>>();
504
505 NonceGenerator ng = this.hConnection.getNonceGenerator();
506 for (Action<Row> action : currentActions) {
507 HRegionLocation loc = findDestLocation(action.getAction(), action.getOriginalIndex());
508 if (loc != null) {
509 addAction(loc, action, actionsByServer, ng);
510 }
511 }
512
513 if (!actionsByServer.isEmpty()) {
514 sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer);
515 }
516 }
517
518
519
520
521
522
523
524
525
526 public void sendMultiAction(final List<Action<Row>> initialActions,
527 Map<HRegionLocation, MultiAction<Row>> actionsByServer,
528 final int numAttempt,
529 final HConnectionManager.ServerErrorTracker errorsByServer) {
530
531
532 for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
533 final HRegionLocation loc = e.getKey();
534 final MultiAction<Row> multiAction = e.getValue();
535 incTaskCounters(multiAction.getRegions(), loc.getServerName());
536 Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
537 @Override
538 public void run() {
539 MultiResponse res;
540 try {
541 MultiServerCallable<Row> callable = createCallable(loc, multiAction);
542 try {
543 res = createCaller(callable).callWithoutRetries(callable, timeout);
544 } catch (IOException e) {
545
546
547 receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
548 errorsByServer);
549 return;
550 } catch (Throwable t) {
551
552 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
553 " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
554 receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
555 errorsByServer);
556 return;
557 }
558
559
560 receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
561
562 } finally {
563 decTaskCounters(multiAction.getRegions(), loc.getServerName());
564 }
565 }
566 });
567
568 try {
569 this.pool.submit(runnable);
570 } catch (RejectedExecutionException ree) {
571
572
573 decTaskCounters(multiAction.getRegions(), loc.getServerName());
574 LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
575 " Server is " + loc.getServerName(), ree);
576
577
578 receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
579 }
580 }
581 }
582
583
584
585
586 protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
587 final MultiAction<Row> multi) {
588 return new MultiServerCallable<Row>(hConnection, tableName, location, this.rpcFactory, multi);
589 }
590
591
592
593
594
595
596 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
597 return rpcCallerFactory.<MultiResponse> newCaller();
598 }
599
600
601
602
603
604
605
606
607
608
609
610 private boolean manageError(int originalIndex, Row row, boolean canRetry,
611 Throwable throwable, HRegionLocation location) {
612 if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
613 canRetry = false;
614 }
615
616 byte[] region = null;
617 if (canRetry && callback != null) {
618 region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
619 canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
620 }
621
622 if (!canRetry) {
623 if (callback != null) {
624 if (region == null && location != null) {
625 region = location.getRegionInfo().getEncodedNameAsBytes();
626 }
627 callback.failure(originalIndex, region, row, throwable);
628 }
629 errors.add(throwable, row, location);
630 this.hasError.set(true);
631 }
632
633 return canRetry;
634 }
635
636
637
638
639
640
641
642
643
644
645 private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
646 HRegionLocation location, int numAttempt, Throwable t,
647 HConnectionManager.ServerErrorTracker errorsByServer) {
648
649
650 hConnection.updateCachedLocations(tableName,
651 rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
652 errorsByServer.reportServerError(location);
653 boolean canRetry = errorsByServer.canRetryMore(numAttempt);
654
655 List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
656 for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
657 for (Action<Row> action : e.getValue()) {
658 if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, location)) {
659 toReplay.add(action);
660 }
661 }
662 }
663
664 logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
665 t, errorsByServer);
666 }
667
668
669
670
671
672 private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
673 List<Action<Row>> toReplay, int numAttempt, int failureCount,
674 Throwable throwable,
675 HConnectionManager.ServerErrorTracker errorsByServer) {
676 if (toReplay.isEmpty()) {
677
678 if (failureCount != 0) {
679
680 LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
681 oldLocation.getServerName(), throwable, -1, false,
682 errorsByServer.getStartTrackingTime()));
683 } else if (numAttempt > startLogErrorsCnt + 1) {
684
685 LOG.info(createLog(numAttempt, failureCount, 0,
686 oldLocation.getServerName(), throwable, -1, false,
687 errorsByServer.getStartTrackingTime()));
688 }
689 return;
690 }
691
692
693
694
695
696
697
698
699 long backOffTime = errorsByServer.calculateBackoffTime(oldLocation, pause);
700
701 if (numAttempt > startLogErrorsCnt) {
702
703
704 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
705 oldLocation.getServerName(), throwable, backOffTime, true,
706 errorsByServer.getStartTrackingTime()));
707 }
708
709 try {
710 Thread.sleep(backOffTime);
711 } catch (InterruptedException e) {
712 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldLocation, e);
713 Thread.currentThread().interrupt();
714 return;
715 }
716
717 submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
718 }
719
720
721
722
723
724
725
726
727
728
729 private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
730 HRegionLocation location,
731 MultiResponse responses, int numAttempt,
732 HConnectionManager.ServerErrorTracker errorsByServer) {
733 assert responses != null;
734
735
736
737
738
739
740
741 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
742 Throwable throwable = null;
743 int failureCount = 0;
744 boolean canRetry = true;
745
746 for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
747 responses.getResults().entrySet()) {
748
749 boolean regionFailureRegistered = false;
750 for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
751 Object result = regionResult.getSecond();
752
753
754 if (result == null || result instanceof Throwable) {
755 throwable = (Throwable) result;
756 Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
757 Row row = correspondingAction.getAction();
758 failureCount++;
759 if (!regionFailureRegistered) {
760 regionFailureRegistered= true;
761
762 hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
763 if (failureCount == 1) {
764 errorsByServer.reportServerError(location);
765 canRetry = errorsByServer.canRetryMore(numAttempt);
766 }
767 }
768
769 if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
770 throwable, location)) {
771 toReplay.add(correspondingAction);
772 }
773 } else {
774 if (callback != null) {
775 int index = regionResult.getFirst();
776 Action<Row> correspondingAction = initialActions.get(index);
777 Row row = correspondingAction.getAction();
778
779 this.callback.success(index, resultsForRS.getKey(), row, (CResult) result);
780 }
781 }
782 }
783 }
784
785
786
787
788 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
789 throwable = throwableEntry.getValue();
790 byte[] region =throwableEntry.getKey();
791 List<Action<Row>> actions = multiAction.actions.get(region);
792 if (actions == null || actions.isEmpty()) {
793 throw new IllegalStateException("Wrong response for the region: " +
794 HRegionInfo.encodeRegionName(region));
795 }
796
797 if (failureCount == 0) {
798 errorsByServer.reportServerError(location);
799 canRetry = errorsByServer.canRetryMore(numAttempt);
800 }
801 hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
802 throwable, location);
803 failureCount += actions.size();
804
805 for (Action<Row> action : actions) {
806 Row row = action.getAction();
807 if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
808 toReplay.add(action);
809 }
810 }
811 }
812
813 logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
814 throwable, errorsByServer);
815 }
816
817 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
818 Throwable error, long backOffTime, boolean willRetry, String startTime){
819 StringBuilder sb = new StringBuilder();
820
821 sb.append("#").append(id).append(", table=").append(tableName).
822 append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
823
824 if (failureCount > 0 || error != null){
825 sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
826 append(error == null ? "null" : error);
827 } else {
828 sb.append("SUCCEEDED");
829 }
830
831 sb.append(" on ").append(sn);
832
833 sb.append(", tracking started ").append(startTime);
834
835 if (willRetry) {
836 sb.append(", retrying after ").append(backOffTime).append(" ms").
837 append(", replay ").append(replaySize).append(" ops.");
838 } else if (failureCount > 0) {
839 sb.append(" - FAILED, NOT RETRYING ANYMORE");
840 }
841
842 return sb.toString();
843 }
844
845
846
847
848
849 protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException {
850 synchronized (this.tasksDone) {
851 while (currentNumberOfTask == tasksDone.get()) {
852 try {
853 this.tasksDone.wait(100);
854 } catch (InterruptedException e) {
855 throw new InterruptedIOException("#" + id + ", interrupted." +
856 " currentNumberOfTask=" + currentNumberOfTask +
857 ", tableName=" + tableName + ", tasksDone=" + tasksDone.get());
858 }
859 }
860 }
861 }
862
863
864
865
866 private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
867 long lastLog = EnvironmentEdgeManager.currentTimeMillis();
868 long currentTasksDone = this.tasksDone.get();
869
870 while ((tasksSent.get() - currentTasksDone) > max) {
871 long now = EnvironmentEdgeManager.currentTimeMillis();
872 if (now > lastLog + 10000) {
873 lastLog = now;
874 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
875 + max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
876 ", currentTasksDone=" + currentTasksDone + ", retries=" + retriesCnt.get() +
877 " hasError=" + hasError() + ", tableName=" + tableName);
878 }
879 waitForNextTaskDone(currentTasksDone);
880 currentTasksDone = this.tasksDone.get();
881 }
882 }
883
884 private long getCurrentTasksCount(){
885 return tasksSent.get() - tasksDone.get();
886 }
887
888
889
890
891 public void waitUntilDone() throws InterruptedIOException {
892 waitForMaximumCurrentTasks(0);
893 }
894
895
896 public boolean hasError() {
897 return hasError.get();
898 }
899
900 public List<? extends Row> getFailedOperations() {
901 return errors.actions;
902 }
903
904
905
906
907 public void clearErrors() {
908 errors.clear();
909 hasError.set(false);
910 }
911
912 public RetriesExhaustedWithDetailsException getErrors() {
913 return errors.makeException();
914 }
915
916
917
918
919 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
920 tasksSent.incrementAndGet();
921
922 AtomicInteger serverCnt = taskCounterPerServer.get(sn);
923 if (serverCnt == null) {
924 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
925 serverCnt = taskCounterPerServer.get(sn);
926 }
927 serverCnt.incrementAndGet();
928
929 for (byte[] regBytes : regions) {
930 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
931 if (regionCnt == null) {
932 regionCnt = new AtomicInteger();
933 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
934 if (oldCnt != null) {
935 regionCnt = oldCnt;
936 }
937 }
938 regionCnt.incrementAndGet();
939 }
940 }
941
942
943
944
945 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
946 for (byte[] regBytes : regions) {
947 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
948 regionCnt.decrementAndGet();
949 }
950
951 taskCounterPerServer.get(sn).decrementAndGet();
952
953 tasksDone.incrementAndGet();
954 synchronized (tasksDone) {
955 tasksDone.notifyAll();
956 }
957 }
958
959
960
961
962
963
964
965
966 protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
967 return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
968 }
969 }