1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.HashSet;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.concurrent.locks.ReentrantLock;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.TimeUnit;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.ProcedureInfo;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.classification.InterfaceStability;
46 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
47 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
48 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
49 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.NonceKey;
56 import org.apache.hadoop.hbase.util.Pair;
57
58 import com.google.common.base.Preconditions;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Private
74 @InterfaceStability.Evolving
75 public class ProcedureExecutor<TEnvironment> {
76 private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
77
78 Testing testing = null;
79 public static class Testing {
80 protected boolean killBeforeStoreUpdate = false;
81 protected boolean toggleKillBeforeStoreUpdate = false;
82
83 protected boolean shouldKillBeforeStoreUpdate() {
84 final boolean kill = this.killBeforeStoreUpdate;
85 if (this.toggleKillBeforeStoreUpdate) {
86 this.killBeforeStoreUpdate = !kill;
87 LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
88 }
89 return kill;
90 }
91 }
92
93 public interface ProcedureExecutorListener {
94 void procedureLoaded(long procId);
95 void procedureAdded(long procId);
96 void procedureFinished(long procId);
97 }
98
99
100
101
102 private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
103 @Override
104 public long getTimeout(Procedure proc) {
105 return proc.getTimeRemaining();
106 }
107
108 @Override
109 public TimeUnit getTimeUnit(Procedure proc) {
110 return TimeUnit.MILLISECONDS;
111 }
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
130 private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
131
132 private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
133 private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000;
134
135 private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
136 private static final int DEFAULT_EVICT_TTL = 15 * 60000;
137
138 private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
139 private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000;
140
141 private final Map<Long, ProcedureInfo> completed;
142 private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
143 private final ProcedureStore store;
144 private final Configuration conf;
145
146 public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
147 final Map<Long, ProcedureInfo> completedMap,
148 final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
149
150 setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
151 this.completed = completedMap;
152 this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
153 this.store = store;
154 this.conf = conf;
155 }
156
157 public void periodicExecute(final TEnvironment env) {
158 if (completed.isEmpty()) {
159 if (LOG.isTraceEnabled()) {
160 LOG.trace("No completed procedures to cleanup.");
161 }
162 return;
163 }
164
165 final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
166 final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
167
168 final long now = EnvironmentEdgeManager.currentTime();
169 final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
170 final boolean isDebugEnabled = LOG.isDebugEnabled();
171 while (it.hasNext() && store.isRunning()) {
172 final Map.Entry<Long, ProcedureInfo> entry = it.next();
173 final ProcedureInfo procInfo = entry.getValue();
174
175
176 if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
177 (now - procInfo.getLastUpdate()) >= evictTtl) {
178 if (isDebugEnabled) {
179 LOG.debug("Evict completed procedure: " + procInfo);
180 }
181 store.delete(entry.getKey());
182 it.remove();
183
184 NonceKey nonceKey = procInfo.getNonceKey();
185 if (nonceKey != null) {
186 nonceKeysToProcIdsMap.remove(nonceKey);
187 }
188 }
189 }
190 }
191
192 @Override
193 protected Procedure[] execute(final TEnvironment env) {
194 throw new UnsupportedOperationException();
195 }
196
197 @Override
198 protected void rollback(final TEnvironment env) {
199 throw new UnsupportedOperationException();
200 }
201
202 @Override
203 protected boolean abort(final TEnvironment env) {
204 throw new UnsupportedOperationException();
205 }
206
207 @Override
208 public void serializeStateData(final OutputStream stream) {
209 throw new UnsupportedOperationException();
210 }
211
212 @Override
213 public void deserializeStateData(final InputStream stream) {
214 throw new UnsupportedOperationException();
215 }
216 }
217
218
219
220
221
222
223 private final ConcurrentHashMap<Long, ProcedureInfo> completed =
224 new ConcurrentHashMap<Long, ProcedureInfo>();
225
226
227
228
229
230
231 private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
232 new ConcurrentHashMap<Long, RootProcedureState>();
233
234
235
236
237
238 private final ConcurrentHashMap<Long, Procedure> procedures =
239 new ConcurrentHashMap<Long, Procedure>();
240
241
242
243
244
245 private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
246 new ConcurrentHashMap<NonceKey, Long>();
247
248
249
250
251
252 private final TimeoutBlockingQueue<Procedure> waitingTimeout =
253 new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
254
255
256
257
258 private final ProcedureRunnableSet runnables;
259
260
261 private final ReentrantLock submitLock = new ReentrantLock();
262 private final AtomicLong lastProcId = new AtomicLong(-1);
263
264 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
265 new CopyOnWriteArrayList<ProcedureExecutorListener>();
266
267 private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
268 private final AtomicBoolean running = new AtomicBoolean(false);
269 private final TEnvironment environment;
270 private final ProcedureStore store;
271 private final Configuration conf;
272
273 private Thread[] threads;
274
275 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
276 final ProcedureStore store) {
277 this(conf, environment, store, new ProcedureSimpleRunQueue());
278 }
279
280 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
281 final ProcedureStore store, final ProcedureRunnableSet runqueue) {
282 this.environment = environment;
283 this.runnables = runqueue;
284 this.store = store;
285 this.conf = conf;
286 }
287
288 private void load(final boolean abortOnCorruption) throws IOException {
289 Preconditions.checkArgument(completed.isEmpty());
290 Preconditions.checkArgument(rollbackStack.isEmpty());
291 Preconditions.checkArgument(procedures.isEmpty());
292 Preconditions.checkArgument(waitingTimeout.isEmpty());
293 Preconditions.checkArgument(runnables.size() == 0);
294
295 store.load(new ProcedureStore.ProcedureLoader() {
296 @Override
297 public void setMaxProcId(long maxProcId) {
298 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
299 LOG.debug("load procedures maxProcId=" + maxProcId);
300 lastProcId.set(maxProcId);
301 }
302
303 @Override
304 public void load(ProcedureIterator procIter) throws IOException {
305 loadProcedures(procIter, abortOnCorruption);
306 }
307
308 @Override
309 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
310 int corruptedCount = 0;
311 while (procIter.hasNext()) {
312 ProcedureInfo proc = procIter.nextAsProcedureInfo();
313 LOG.error("corrupted procedure: " + proc);
314 corruptedCount++;
315 }
316 if (abortOnCorruption && corruptedCount > 0) {
317 throw new IOException("found " + corruptedCount + " procedures on replay");
318 }
319 }
320 });
321 }
322
323 private void loadProcedures(final ProcedureIterator procIter,
324 final boolean abortOnCorruption) throws IOException {
325 final boolean isDebugEnabled = LOG.isDebugEnabled();
326
327
328 int runnablesCount = 0;
329 while (procIter.hasNext()) {
330 final NonceKey nonceKey;
331 final long procId;
332
333 if (procIter.isNextCompleted()) {
334 ProcedureInfo proc = procIter.nextAsProcedureInfo();
335 nonceKey = proc.getNonceKey();
336 procId = proc.getProcId();
337 completed.put(proc.getProcId(), proc);
338 if (isDebugEnabled) {
339 LOG.debug("The procedure is completed: " + proc);
340 }
341 } else {
342 Procedure proc = procIter.nextAsProcedure();
343 nonceKey = proc.getNonceKey();
344 procId = proc.getProcId();
345
346 if (!proc.hasParent()) {
347 assert !proc.isFinished() : "unexpected finished procedure";
348 rollbackStack.put(proc.getProcId(), new RootProcedureState());
349 }
350
351
352 proc.beforeReplay(getEnvironment());
353 procedures.put(proc.getProcId(), proc);
354
355 if (proc.getState() == ProcedureState.RUNNABLE) {
356 runnablesCount++;
357 }
358 }
359
360
361 if (nonceKey != null) {
362 nonceKeysToProcIdsMap.put(nonceKey, procId);
363 }
364 }
365
366
367 ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
368 HashSet<Procedure> waitingSet = null;
369 procIter.reset();
370 while (procIter.hasNext()) {
371 if (procIter.isNextCompleted()) {
372 procIter.skipNext();
373 continue;
374 }
375
376 Procedure proc = procIter.nextAsProcedure();
377 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
378
379 if (isDebugEnabled) {
380 LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
381 proc.getState(), proc.hasException(), proc));
382 }
383
384 Long rootProcId = getRootProcedureId(proc);
385 if (rootProcId == null) {
386
387 runnables.addBack(proc);
388 continue;
389 }
390
391 if (proc.hasParent() && !proc.isFinished()) {
392 Procedure parent = procedures.get(proc.getParentProcId());
393
394 if (parent != null) {
395 parent.incChildrenLatch();
396 }
397 }
398
399 RootProcedureState procStack = rollbackStack.get(rootProcId);
400 procStack.loadStack(proc);
401
402 switch (proc.getState()) {
403 case RUNNABLE:
404 runnableList.add(proc);
405 break;
406 case WAITING_TIMEOUT:
407 if (waitingSet == null) {
408 waitingSet = new HashSet<Procedure>();
409 }
410 waitingSet.add(proc);
411 break;
412 case FINISHED:
413 if (proc.hasException()) {
414
415 runnables.addBack(proc);
416 break;
417 }
418 case ROLLEDBACK:
419 case INITIALIZING:
420 String msg = "Unexpected " + proc.getState() + " state for " + proc;
421 LOG.error(msg);
422 throw new UnsupportedOperationException(msg);
423 default:
424 break;
425 }
426 }
427
428
429 int corruptedCount = 0;
430 Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
431 while (itStack.hasNext()) {
432 Map.Entry<Long, RootProcedureState> entry = itStack.next();
433 RootProcedureState procStack = entry.getValue();
434 if (procStack.isValid()) continue;
435
436 for (Procedure proc: procStack.getSubprocedures()) {
437 LOG.error("corrupted procedure: " + proc);
438 procedures.remove(proc.getProcId());
439 runnableList.remove(proc);
440 if (waitingSet != null) waitingSet.remove(proc);
441 corruptedCount++;
442 }
443 itStack.remove();
444 }
445
446 if (abortOnCorruption && corruptedCount > 0) {
447 throw new IOException("found " + corruptedCount + " procedures on replay");
448 }
449
450
451 if (!runnableList.isEmpty()) {
452
453
454 for (int i = runnableList.size() - 1; i >= 0; --i) {
455 Procedure proc = runnableList.get(i);
456 if (!proc.hasParent()) {
457 sendProcedureLoadedNotification(proc.getProcId());
458 }
459 if (proc.wasExecuted()) {
460 runnables.addFront(proc);
461 } else {
462
463 runnables.addBack(proc);
464 }
465 }
466 }
467 }
468
469
470
471
472
473
474
475
476
477
478
479 public void start(int numThreads, boolean abortOnCorruption) throws IOException {
480 if (running.getAndSet(true)) {
481 LOG.warn("Already running");
482 return;
483 }
484
485
486
487 threads = new Thread[numThreads + 1];
488 LOG.info("Starting procedure executor threads=" + threads.length);
489
490
491 for (int i = 0; i < numThreads; ++i) {
492 threads[i] = new Thread("ProcedureExecutor-" + i) {
493 @Override
494 public void run() {
495 execLoop();
496 }
497 };
498 }
499
500
501 threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
502 @Override
503 public void run() {
504 timeoutLoop();
505 }
506 };
507
508
509 store.recoverLease();
510
511
512
513
514
515
516 load(abortOnCorruption);
517
518
519 for (int i = 0; i < threads.length; ++i) {
520 threads[i].start();
521 }
522
523
524 waitingTimeout.add(
525 new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
526 }
527
528 public void stop() {
529 if (!running.getAndSet(false)) {
530 return;
531 }
532
533 LOG.info("Stopping the procedure executor");
534 runnables.signalAll();
535 waitingTimeout.signalAll();
536 }
537
538 public void join() {
539 boolean interrupted = false;
540
541 for (int i = 0; i < threads.length; ++i) {
542 try {
543 threads[i].join();
544 } catch (InterruptedException ex) {
545 interrupted = true;
546 }
547 }
548
549 if (interrupted) {
550 Thread.currentThread().interrupt();
551 }
552
553 completed.clear();
554 rollbackStack.clear();
555 procedures.clear();
556 nonceKeysToProcIdsMap.clear();
557 waitingTimeout.clear();
558 runnables.clear();
559 lastProcId.set(-1);
560 }
561
562 public boolean isRunning() {
563 return running.get();
564 }
565
566
567
568
569 public int getNumThreads() {
570 return threads == null ? 0 : (threads.length - 1);
571 }
572
573 public int getActiveExecutorCount() {
574 return activeExecutorCount.get();
575 }
576
577 public TEnvironment getEnvironment() {
578 return this.environment;
579 }
580
581 public ProcedureStore getStore() {
582 return this.store;
583 }
584
585 public void registerListener(ProcedureExecutorListener listener) {
586 this.listeners.add(listener);
587 }
588
589 public boolean unregisterListener(ProcedureExecutorListener listener) {
590 return this.listeners.remove(listener);
591 }
592
593
594
595
596
597 public List<ProcedureInfo> listProcedures() {
598 List<ProcedureInfo> procedureLists =
599 new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
600 for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
601 procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
602 }
603 for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
604
605
606
607
608 procedureLists.add(e.getValue());
609 }
610 return procedureLists;
611 }
612
613
614
615
616
617
618 public long submitProcedure(final Procedure proc) {
619 return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
620 }
621
622
623
624
625
626
627
628
629 public long submitProcedure(
630 final Procedure proc,
631 final long nonceGroup,
632 final long nonce) {
633 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
634 Preconditions.checkArgument(isRunning());
635 Preconditions.checkArgument(lastProcId.get() >= 0);
636 Preconditions.checkArgument(!proc.hasParent());
637
638 Long currentProcId;
639
640
641
642 synchronized (this) {
643
644
645
646 NonceKey noncekey = null;
647 if (nonce != HConstants.NO_NONCE) {
648 noncekey = new NonceKey(nonceGroup, nonce);
649 currentProcId = nonceKeysToProcIdsMap.get(noncekey);
650 if (currentProcId != null) {
651
652 return currentProcId;
653 }
654 }
655
656
657 currentProcId = nextProcId();
658 proc.setProcId(currentProcId);
659
660
661 if (noncekey != null) {
662 proc.setNonceKey(noncekey);
663 nonceKeysToProcIdsMap.put(noncekey, currentProcId);
664 }
665 }
666
667
668 store.insert(proc, null);
669 if (LOG.isDebugEnabled()) {
670 LOG.debug("Procedure " + proc + " added to the store.");
671 }
672
673
674 RootProcedureState stack = new RootProcedureState();
675 rollbackStack.put(currentProcId, stack);
676
677
678 assert !procedures.containsKey(currentProcId);
679 procedures.put(currentProcId, proc);
680 sendProcedureAddedNotification(currentProcId);
681 runnables.addBack(proc);
682 return currentProcId;
683 }
684
685 public ProcedureInfo getResult(final long procId) {
686 return completed.get(procId);
687 }
688
689
690
691
692
693
694
695
696 public boolean isFinished(final long procId) {
697 return completed.containsKey(procId);
698 }
699
700
701
702
703
704
705 public boolean isStarted(final long procId) {
706 Procedure proc = procedures.get(procId);
707 if (proc == null) {
708 return completed.get(procId) != null;
709 }
710 return proc.wasExecuted();
711 }
712
713
714
715
716
717 public void removeResult(final long procId) {
718 ProcedureInfo result = completed.get(procId);
719 if (result == null) {
720 assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
721 if (LOG.isDebugEnabled()) {
722 LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
723 }
724 return;
725 }
726
727
728 result.setClientAckTime(EnvironmentEdgeManager.currentTime());
729 }
730
731
732
733
734
735
736
737 public boolean abort(final long procId) {
738 return abort(procId, true);
739 }
740
741
742
743
744
745
746
747
748 public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
749 Procedure proc = procedures.get(procId);
750 if (proc != null) {
751 if (!mayInterruptIfRunning && proc.wasExecuted()) {
752 return false;
753 } else {
754 return proc.abort(getEnvironment());
755 }
756 }
757 return false;
758 }
759
760
761
762
763
764
765
766
767 public boolean isProcedureOwner(final long procId, final User user) {
768 if (user == null) {
769 return false;
770 }
771
772 Procedure proc = procedures.get(procId);
773 if (proc != null) {
774 return proc.getOwner().equals(user.getShortName());
775 }
776 ProcedureInfo procInfo = completed.get(procId);
777 if (procInfo == null) {
778
779
780 return false;
781 }
782 return ProcedureInfo.isProcedureOwner(procInfo, user);
783 }
784
785 public Map<Long, ProcedureInfo> getResults() {
786 return Collections.unmodifiableMap(completed);
787 }
788
789 public Procedure getProcedure(final long procId) {
790 return procedures.get(procId);
791 }
792
793 protected ProcedureRunnableSet getRunnableSet() {
794 return runnables;
795 }
796
797
798
799
800
801
802 private void execLoop() {
803 while (isRunning()) {
804 Procedure proc = runnables.poll();
805 if (proc == null) continue;
806
807 try {
808 activeExecutorCount.incrementAndGet();
809 execLoop(proc);
810 } finally {
811 activeExecutorCount.decrementAndGet();
812 }
813 }
814 }
815
816 private void execLoop(Procedure proc) {
817 if (LOG.isTraceEnabled()) {
818 LOG.trace("Trying to start the execution of " + proc);
819 }
820
821 Long rootProcId = getRootProcedureId(proc);
822 if (rootProcId == null) {
823
824 executeRollback(proc);
825 return;
826 }
827
828 RootProcedureState procStack = rollbackStack.get(rootProcId);
829 if (procStack == null) return;
830
831 do {
832
833 if (!procStack.acquire(proc)) {
834 if (procStack.setRollback()) {
835
836 if (!executeRollback(rootProcId, procStack)) {
837 procStack.unsetRollback();
838 runnables.yield(proc);
839 }
840 } else {
841
842
843
844 if (!proc.wasExecuted()) {
845 if (!executeRollback(proc)) {
846 runnables.yield(proc);
847 }
848 }
849 }
850 break;
851 }
852
853
854 assert proc.getState() == ProcedureState.RUNNABLE;
855 if (proc.acquireLock(getEnvironment())) {
856 execProcedure(procStack, proc);
857 proc.releaseLock(getEnvironment());
858 } else {
859 runnables.yield(proc);
860 }
861 procStack.release(proc);
862
863
864
865 if (testing != null && !isRunning()) {
866 break;
867 }
868
869 if (proc.isSuccess()) {
870 if (LOG.isDebugEnabled()) {
871 LOG.debug("Procedure completed in " +
872 StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
873 }
874
875 if (proc.getProcId() == rootProcId) {
876 procedureFinished(proc);
877 }
878 break;
879 }
880 } while (procStack.isFailed());
881 }
882
883 private void timeoutLoop() {
884 while (isRunning()) {
885 Procedure proc = waitingTimeout.poll();
886 if (proc == null) continue;
887
888 if (proc.getTimeRemaining() > 100) {
889
890
891 waitingTimeout.add(proc);
892 continue;
893 }
894
895
896
897
898
899
900
901
902
903
904
905
906 if (proc instanceof CompletedProcedureCleaner) {
907 try {
908 ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
909 } catch (Throwable e) {
910 LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
911 }
912 proc.setStartTime(EnvironmentEdgeManager.currentTime());
913 waitingTimeout.add(proc);
914 continue;
915 }
916
917
918
919 if (proc.setTimeoutFailure()) {
920 long rootProcId = Procedure.getRootProcedureId(procedures, proc);
921 RootProcedureState procStack = rollbackStack.get(rootProcId);
922 procStack.abort();
923 store.update(proc);
924 runnables.addFront(proc);
925 continue;
926 }
927 }
928 }
929
930
931
932
933
934
935 private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
936 Procedure rootProc = procedures.get(rootProcId);
937 RemoteProcedureException exception = rootProc.getException();
938 if (exception == null) {
939 exception = procStack.getException();
940 rootProc.setFailure(exception);
941 store.update(rootProc);
942 }
943
944 List<Procedure> subprocStack = procStack.getSubprocedures();
945 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
946
947 int stackTail = subprocStack.size();
948 boolean reuseLock = false;
949 while (stackTail --> 0) {
950 final Procedure proc = subprocStack.get(stackTail);
951
952 if (!reuseLock && !proc.acquireLock(getEnvironment())) {
953
954
955 return false;
956 }
957
958 boolean abortRollback = !executeRollback(proc);
959 abortRollback |= !isRunning() || !store.isRunning();
960
961
962
963
964 reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
965 if (!reuseLock) {
966 proc.releaseLock(getEnvironment());
967 }
968
969
970
971 if (abortRollback) {
972 return false;
973 }
974
975 subprocStack.remove(stackTail);
976
977
978 if (proc.isYieldAfterExecutionStep(getEnvironment())) {
979 return false;
980 }
981 }
982
983
984 LOG.info("Rolledback procedure " + rootProc +
985 " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
986 " exception=" + exception.getMessage());
987 procedureFinished(rootProc);
988 return true;
989 }
990
991
992
993
994
995
996 private boolean executeRollback(final Procedure proc) {
997 try {
998 proc.doRollback(getEnvironment());
999 } catch (IOException e) {
1000 if (LOG.isDebugEnabled()) {
1001 LOG.debug("rollback attempt failed for " + proc, e);
1002 }
1003 return false;
1004 } catch (InterruptedException e) {
1005 handleInterruptedException(proc, e);
1006 return false;
1007 } catch (Throwable e) {
1008
1009 LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
1010 }
1011
1012
1013
1014 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1015 LOG.debug("TESTING: Kill before store update");
1016 stop();
1017 return false;
1018 }
1019
1020 if (proc.removeStackIndex()) {
1021 proc.setState(ProcedureState.ROLLEDBACK);
1022 if (proc.hasParent()) {
1023 store.delete(proc.getProcId());
1024 procedures.remove(proc.getProcId());
1025 } else {
1026 store.update(proc);
1027 }
1028 } else {
1029 store.update(proc);
1030 }
1031
1032 return true;
1033 }
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052 private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
1053 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
1054
1055
1056 boolean reExecute = false;
1057 Procedure[] subprocs = null;
1058 do {
1059 reExecute = false;
1060 try {
1061 subprocs = procedure.doExecute(getEnvironment());
1062 if (subprocs != null && subprocs.length == 0) {
1063 subprocs = null;
1064 }
1065 } catch (ProcedureYieldException e) {
1066 if (LOG.isTraceEnabled()) {
1067 LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
1068 }
1069 runnables.yield(procedure);
1070 return;
1071 } catch (InterruptedException e) {
1072 handleInterruptedException(procedure, e);
1073 runnables.yield(procedure);
1074 return;
1075 } catch (Throwable e) {
1076
1077 String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1078 LOG.error(msg, e);
1079 procedure.setFailure(new RemoteProcedureException(msg, e));
1080 }
1081
1082 if (!procedure.isFailed()) {
1083 if (subprocs != null) {
1084 if (subprocs.length == 1 && subprocs[0] == procedure) {
1085
1086 subprocs = null;
1087 reExecute = true;
1088 } else {
1089
1090 for (int i = 0; i < subprocs.length; ++i) {
1091 Procedure subproc = subprocs[i];
1092 if (subproc == null) {
1093 String msg = "subproc[" + i + "] is null, aborting the procedure";
1094 procedure.setFailure(new RemoteProcedureException(msg,
1095 new IllegalArgumentIOException(msg)));
1096 subprocs = null;
1097 break;
1098 }
1099
1100 assert subproc.getState() == ProcedureState.INITIALIZING;
1101 subproc.setParentProcId(procedure.getProcId());
1102 subproc.setProcId(nextProcId());
1103 }
1104
1105 if (!procedure.isFailed()) {
1106 procedure.setChildrenLatch(subprocs.length);
1107 switch (procedure.getState()) {
1108 case RUNNABLE:
1109 procedure.setState(ProcedureState.WAITING);
1110 break;
1111 case WAITING_TIMEOUT:
1112 waitingTimeout.add(procedure);
1113 break;
1114 default:
1115 break;
1116 }
1117 }
1118 }
1119 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1120 waitingTimeout.add(procedure);
1121 } else {
1122
1123 procedure.setState(ProcedureState.FINISHED);
1124 }
1125 }
1126
1127
1128 procStack.addRollbackStep(procedure);
1129
1130
1131
1132 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1133 LOG.debug("TESTING: Kill before store update");
1134 stop();
1135 return;
1136 }
1137
1138
1139 if (subprocs != null && !procedure.isFailed()) {
1140 if (LOG.isTraceEnabled()) {
1141 LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1142 }
1143 store.insert(procedure, subprocs);
1144 } else {
1145 if (LOG.isTraceEnabled()) {
1146 LOG.trace("Store update " + procedure);
1147 }
1148 store.update(procedure);
1149 }
1150
1151
1152 if (!store.isRunning()) {
1153 return;
1154 }
1155
1156
1157 if (procedure.getState() == ProcedureState.RUNNABLE &&
1158 procedure.isYieldAfterExecutionStep(getEnvironment())) {
1159 runnables.yield(procedure);
1160 return;
1161 }
1162
1163 assert (reExecute && subprocs == null) || !reExecute;
1164 } while (reExecute);
1165
1166
1167 if (subprocs != null && !procedure.isFailed()) {
1168 for (int i = 0; i < subprocs.length; ++i) {
1169 Procedure subproc = subprocs[i];
1170 assert !procedures.containsKey(subproc.getProcId());
1171 procedures.put(subproc.getProcId(), subproc);
1172 runnables.addFront(subproc);
1173 }
1174 }
1175
1176 if (procedure.isFinished() && procedure.hasParent()) {
1177 Procedure parent = procedures.get(procedure.getParentProcId());
1178 if (parent == null) {
1179 assert procStack.isRollingback();
1180 return;
1181 }
1182
1183
1184 if (LOG.isTraceEnabled()) {
1185 LOG.trace(parent + " child is done: " + procedure);
1186 }
1187 if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1188 parent.setState(ProcedureState.RUNNABLE);
1189 store.update(parent);
1190 runnables.addFront(parent);
1191 if (LOG.isTraceEnabled()) {
1192 LOG.trace(parent + " all the children finished their work, resume.");
1193 }
1194 return;
1195 }
1196 }
1197 }
1198
1199 private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
1200 if (LOG.isTraceEnabled()) {
1201 LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
1202 }
1203
1204
1205
1206
1207
1208
1209 }
1210
1211 private void sendProcedureLoadedNotification(final long procId) {
1212 if (!this.listeners.isEmpty()) {
1213 for (ProcedureExecutorListener listener: this.listeners) {
1214 try {
1215 listener.procedureLoaded(procId);
1216 } catch (Throwable e) {
1217 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1218 }
1219 }
1220 }
1221 }
1222
1223 private void sendProcedureAddedNotification(final long procId) {
1224 if (!this.listeners.isEmpty()) {
1225 for (ProcedureExecutorListener listener: this.listeners) {
1226 try {
1227 listener.procedureAdded(procId);
1228 } catch (Throwable e) {
1229 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1230 }
1231 }
1232 }
1233 }
1234
1235 private void sendProcedureFinishedNotification(final long procId) {
1236 if (!this.listeners.isEmpty()) {
1237 for (ProcedureExecutorListener listener: this.listeners) {
1238 try {
1239 listener.procedureFinished(procId);
1240 } catch (Throwable e) {
1241 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1242 }
1243 }
1244 }
1245 }
1246
1247 private long nextProcId() {
1248 long procId = lastProcId.incrementAndGet();
1249 if (procId < 0) {
1250 while (!lastProcId.compareAndSet(procId, 0)) {
1251 procId = lastProcId.get();
1252 if (procId >= 0)
1253 break;
1254 }
1255 while (procedures.containsKey(procId)) {
1256 procId = lastProcId.incrementAndGet();
1257 }
1258 }
1259 return procId;
1260 }
1261
1262 private Long getRootProcedureId(Procedure proc) {
1263 return Procedure.getRootProcedureId(procedures, proc);
1264 }
1265
1266 private void procedureFinished(final Procedure proc) {
1267
1268 try {
1269 proc.completionCleanup(getEnvironment());
1270 } catch (Throwable e) {
1271
1272 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1273 }
1274
1275
1276 ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());
1277 if (!proc.shouldWaitClientAck(getEnvironment())) {
1278 procInfo.setClientAckTime(0);
1279 }
1280
1281 completed.put(procInfo.getProcId(), procInfo);
1282 rollbackStack.remove(proc.getProcId());
1283 procedures.remove(proc.getProcId());
1284
1285
1286 try {
1287 runnables.completionCleanup(proc);
1288 } catch (Throwable e) {
1289
1290 LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1291 }
1292
1293
1294 sendProcedureFinishedNotification(proc.getProcId());
1295 }
1296
1297 public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1298 ProcedureInfo result = completed.get(procId);
1299 Procedure proc = null;
1300 if (result == null) {
1301 proc = procedures.get(procId);
1302 if (proc == null) {
1303 result = completed.get(procId);
1304 }
1305 }
1306 return new Pair(result, proc);
1307 }
1308 }