1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.Modifier;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38 import org.apache.hadoop.hbase.util.ByteStringer;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.apache.hadoop.hbase.util.NonceKey;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.base.Preconditions;
44 import com.google.protobuf.ByteString;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 @InterfaceStability.Evolving
67 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68
69 private String owner = null;
70 private Long parentProcId = null;
71 private Long procId = null;
72 private long startTime;
73
74
75 private ProcedureState state = ProcedureState.INITIALIZING;
76 private Integer timeout = null;
77 private int[] stackIndexes = null;
78 private int childrenLatch = 0;
79 private long lastUpdate;
80
81 private RemoteProcedureException exception = null;
82 private byte[] result = null;
83
84 private NonceKey nonceKey = null;
85
86
87
88
89
90
91
92
93
94
95 protected abstract Procedure[] execute(TEnvironment env)
96 throws ProcedureYieldException, InterruptedException;
97
98
99
100
101
102
103
104
105
106
107
108
109 protected abstract void rollback(TEnvironment env)
110 throws IOException, InterruptedException;
111
112
113
114
115
116
117
118
119
120
121
122
123
124 protected abstract boolean abort(TEnvironment env);
125
126
127
128
129
130
131 protected abstract void serializeStateData(final OutputStream stream)
132 throws IOException;
133
134
135
136
137
138
139 protected abstract void deserializeStateData(final InputStream stream)
140 throws IOException;
141
142
143
144
145
146
147
148
149
150
151 protected boolean acquireLock(final TEnvironment env) {
152 return true;
153 }
154
155
156
157
158 protected void releaseLock(final TEnvironment env) {
159
160 }
161
162
163
164
165
166
167
168 protected void beforeReplay(final TEnvironment env) {
169
170 }
171
172
173
174
175
176
177 protected void completionCleanup(final TEnvironment env) {
178
179 }
180
181
182
183
184
185
186
187
188
189 protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
190 return false;
191 }
192
193
194
195
196
197
198
199
200
201
202 protected boolean shouldWaitClientAck(final TEnvironment env) {
203 return true;
204 }
205
206 @Override
207 public String toString() {
208 StringBuilder sb = new StringBuilder();
209 toStringClassDetails(sb);
210
211 if (procId != null) {
212 sb.append(" id=");
213 sb.append(getProcId());
214 }
215
216 if (hasParent()) {
217 sb.append(" parent=");
218 sb.append(getParentProcId());
219 }
220
221 if (hasOwner()) {
222 sb.append(" owner=");
223 sb.append(getOwner());
224 }
225
226 sb.append(" state=");
227 toStringState(sb);
228 return sb.toString();
229 }
230
231 protected String toStringClass() {
232 StringBuilder sb = new StringBuilder();
233 toStringClassDetails(sb);
234
235 return sb.toString();
236 }
237
238
239
240
241
242 protected void toStringState(StringBuilder builder) {
243 builder.append(getState());
244 }
245
246
247
248
249
250
251 protected void toStringClassDetails(StringBuilder builder) {
252 builder.append(getClass().getName());
253 }
254
255
256
257
258 public byte[] getResult() {
259 return result;
260 }
261
262
263
264
265
266 protected void setResult(final byte[] result) {
267 this.result = result;
268 }
269
270 public long getProcId() {
271 return procId;
272 }
273
274 public boolean hasParent() {
275 return parentProcId != null;
276 }
277
278 public boolean hasException() {
279 return exception != null;
280 }
281
282 public boolean hasTimeout() {
283 return timeout != null;
284 }
285
286 public long getParentProcId() {
287 return parentProcId;
288 }
289
290 public NonceKey getNonceKey() {
291 return nonceKey;
292 }
293
294
295
296
297
298 public synchronized boolean isFailed() {
299 return exception != null || state == ProcedureState.ROLLEDBACK;
300 }
301
302
303
304
305 public synchronized boolean isSuccess() {
306 return state == ProcedureState.FINISHED && exception == null;
307 }
308
309
310
311
312
313 public synchronized boolean isFinished() {
314 switch (state) {
315 case ROLLEDBACK:
316 return true;
317 case FINISHED:
318 return exception == null;
319 default:
320 break;
321 }
322 return false;
323 }
324
325
326
327
328 public synchronized boolean isWaiting() {
329 switch (state) {
330 case WAITING:
331 case WAITING_TIMEOUT:
332 return true;
333 default:
334 break;
335 }
336 return false;
337 }
338
339 public synchronized RemoteProcedureException getException() {
340 return exception;
341 }
342
343 public long getStartTime() {
344 return startTime;
345 }
346
347 public synchronized long getLastUpdate() {
348 return lastUpdate;
349 }
350
351 public synchronized long elapsedTime() {
352 return lastUpdate - startTime;
353 }
354
355
356
357
358 protected void setTimeout(final int timeout) {
359 this.timeout = timeout;
360 }
361
362
363
364
365 public int getTimeout() {
366 return timeout;
367 }
368
369
370
371
372 public long getTimeRemaining() {
373 return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
374 }
375
376 @VisibleForTesting
377 @InterfaceAudience.Private
378 public void setOwner(final String owner) {
379 this.owner = StringUtils.isEmpty(owner) ? null : owner;
380 }
381
382 public String getOwner() {
383 return owner;
384 }
385
386 public boolean hasOwner() {
387 return owner != null;
388 }
389
390 @VisibleForTesting
391 @InterfaceAudience.Private
392 protected synchronized void setState(final ProcedureState state) {
393 this.state = state;
394 updateTimestamp();
395 }
396
397 @InterfaceAudience.Private
398 protected synchronized ProcedureState getState() {
399 return state;
400 }
401
402 protected void setFailure(final String source, final Throwable cause) {
403 setFailure(new RemoteProcedureException(source, cause));
404 }
405
406 protected synchronized void setFailure(final RemoteProcedureException exception) {
407 this.exception = exception;
408 if (!isFinished()) {
409 setState(ProcedureState.FINISHED);
410 }
411 }
412
413 protected void setAbortFailure(final String source, final String msg) {
414 setFailure(source, new ProcedureAbortedException(msg));
415 }
416
417 @InterfaceAudience.Private
418 protected synchronized boolean setTimeoutFailure() {
419 if (state == ProcedureState.WAITING_TIMEOUT) {
420 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
421 setFailure("ProcedureExecutor", new TimeoutIOException(
422 "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
423 return true;
424 }
425 return false;
426 }
427
428
429
430
431 @VisibleForTesting
432 @InterfaceAudience.Private
433 protected void setProcId(final long procId) {
434 this.procId = procId;
435 this.startTime = EnvironmentEdgeManager.currentTime();
436 setState(ProcedureState.RUNNABLE);
437 }
438
439
440
441
442 @InterfaceAudience.Private
443 protected void setParentProcId(final long parentProcId) {
444 this.parentProcId = parentProcId;
445 }
446
447
448
449
450 @VisibleForTesting
451 @InterfaceAudience.Private
452 protected void setNonceKey(final NonceKey nonceKey) {
453 this.nonceKey = nonceKey;
454 }
455
456
457
458
459
460 @InterfaceAudience.Private
461 protected Procedure[] doExecute(final TEnvironment env)
462 throws ProcedureYieldException, InterruptedException {
463 try {
464 updateTimestamp();
465 return execute(env);
466 } finally {
467 updateTimestamp();
468 }
469 }
470
471
472
473
474
475 @InterfaceAudience.Private
476 protected void doRollback(final TEnvironment env)
477 throws IOException, InterruptedException {
478 try {
479 updateTimestamp();
480 rollback(env);
481 } finally {
482 updateTimestamp();
483 }
484 }
485
486
487
488
489
490 @InterfaceAudience.Private
491 protected void setStartTime(final long startTime) {
492 this.startTime = startTime;
493 }
494
495
496
497
498
499 private synchronized void setLastUpdate(final long lastUpdate) {
500 this.lastUpdate = lastUpdate;
501 }
502
503 protected synchronized void updateTimestamp() {
504 this.lastUpdate = EnvironmentEdgeManager.currentTime();
505 }
506
507
508
509
510 @InterfaceAudience.Private
511 protected synchronized void setChildrenLatch(final int numChildren) {
512 this.childrenLatch = numChildren;
513 }
514
515
516
517
518 @InterfaceAudience.Private
519 protected synchronized void incChildrenLatch() {
520
521 this.childrenLatch++;
522 }
523
524
525
526
527
528 @InterfaceAudience.Private
529 protected synchronized boolean childrenCountDown() {
530 assert childrenLatch > 0;
531 return --childrenLatch == 0;
532 }
533
534
535
536
537
538 @InterfaceAudience.Private
539 protected synchronized void addStackIndex(final int index) {
540 if (stackIndexes == null) {
541 stackIndexes = new int[] { index };
542 } else {
543 int count = stackIndexes.length;
544 stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
545 stackIndexes[count] = index;
546 }
547 }
548
549 @InterfaceAudience.Private
550 protected synchronized boolean removeStackIndex() {
551 if (stackIndexes.length > 1) {
552 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
553 return false;
554 } else {
555 stackIndexes = null;
556 return true;
557 }
558 }
559
560
561
562
563
564 @InterfaceAudience.Private
565 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
566 this.stackIndexes = new int[stackIndexes.size()];
567 for (int i = 0; i < this.stackIndexes.length; ++i) {
568 this.stackIndexes[i] = stackIndexes.get(i);
569 }
570 }
571
572 @InterfaceAudience.Private
573 protected synchronized boolean wasExecuted() {
574 return stackIndexes != null;
575 }
576
577 @InterfaceAudience.Private
578 protected synchronized int[] getStackIndexes() {
579 return stackIndexes;
580 }
581
582 @Override
583 public int compareTo(final Procedure other) {
584 long diff = getProcId() - other.getProcId();
585 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
586 }
587
588
589
590
591
592 public static long getProcIdHashCode(final long procId) {
593 long h = procId;
594 h ^= h >> 16;
595 h *= 0x85ebca6b;
596 h ^= h >> 13;
597 h *= 0xc2b2ae35;
598 h ^= h >> 16;
599 return h;
600 }
601
602
603
604
605 @InterfaceAudience.Private
606 protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
607 while (proc.hasParent()) {
608 proc = procedures.get(proc.getParentProcId());
609 if (proc == null) return null;
610 }
611 return proc.getProcId();
612 }
613
614 protected static Procedure newInstance(final String className) throws IOException {
615 try {
616 Class<?> clazz = Class.forName(className);
617 if (!Modifier.isPublic(clazz.getModifiers())) {
618 throw new Exception("the " + clazz + " class is not public");
619 }
620
621 Constructor<?> ctor = clazz.getConstructor();
622 assert ctor != null : "no constructor found";
623 if (!Modifier.isPublic(ctor.getModifiers())) {
624 throw new Exception("the " + clazz + " constructor is not public");
625 }
626 return (Procedure)ctor.newInstance();
627 } catch (Exception e) {
628 throw new IOException("The procedure class " + className +
629 " must be accessible and have an empty constructor", e);
630 }
631 }
632
633 protected static void validateClass(final Procedure proc) throws IOException {
634 try {
635 Class<?> clazz = proc.getClass();
636 if (!Modifier.isPublic(clazz.getModifiers())) {
637 throw new Exception("the " + clazz + " class is not public");
638 }
639
640 Constructor<?> ctor = clazz.getConstructor();
641 assert ctor != null;
642 if (!Modifier.isPublic(ctor.getModifiers())) {
643 throw new Exception("the " + clazz + " constructor is not public");
644 }
645 } catch (Exception e) {
646 throw new IOException("The procedure class " + proc.getClass().getName() +
647 " must be accessible and have an empty constructor", e);
648 }
649 }
650
651
652
653
654 @InterfaceAudience.Private
655 public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
656 RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
657 return new ProcedureInfo(
658 proc.getProcId(),
659 proc.toStringClass(),
660 proc.getOwner(),
661 proc.getState(),
662 proc.hasParent() ? proc.getParentProcId() : -1,
663 nonceKey,
664 exception != null ?
665 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
666 proc.getLastUpdate(),
667 proc.getStartTime(),
668 proc.getResult());
669 }
670
671
672
673
674
675 @InterfaceAudience.Private
676 public static ProcedureProtos.Procedure convert(final Procedure proc)
677 throws IOException {
678 Preconditions.checkArgument(proc != null);
679 validateClass(proc);
680
681 ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
682 .setClassName(proc.getClass().getName())
683 .setProcId(proc.getProcId())
684 .setState(proc.getState())
685 .setStartTime(proc.getStartTime())
686 .setLastUpdate(proc.getLastUpdate());
687
688 if (proc.hasParent()) {
689 builder.setParentId(proc.getParentProcId());
690 }
691
692 if (proc.hasTimeout()) {
693 builder.setTimeout(proc.getTimeout());
694 }
695
696 if (proc.hasOwner()) {
697 builder.setOwner(proc.getOwner());
698 }
699
700 int[] stackIds = proc.getStackIndexes();
701 if (stackIds != null) {
702 for (int i = 0; i < stackIds.length; ++i) {
703 builder.addStackId(stackIds[i]);
704 }
705 }
706
707 if (proc.hasException()) {
708 RemoteProcedureException exception = proc.getException();
709 builder.setException(
710 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
711 }
712
713 byte[] result = proc.getResult();
714 if (result != null) {
715 builder.setResult(ByteStringer.wrap(result));
716 }
717
718 ByteString.Output stateStream = ByteString.newOutput();
719 proc.serializeStateData(stateStream);
720 if (stateStream.size() > 0) {
721 builder.setStateData(stateStream.toByteString());
722 }
723
724 if (proc.getNonceKey() != null) {
725 builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
726 builder.setNonce(proc.getNonceKey().getNonce());
727 }
728
729 return builder.build();
730 }
731
732
733
734
735
736
737
738
739
740
741 @InterfaceAudience.Private
742 public static Procedure convert(final ProcedureProtos.Procedure proto)
743 throws IOException {
744
745 Procedure proc = Procedure.newInstance(proto.getClassName());
746
747
748 proc.setProcId(proto.getProcId());
749 proc.setState(proto.getState());
750 proc.setStartTime(proto.getStartTime());
751 proc.setLastUpdate(proto.getLastUpdate());
752
753 if (proto.hasParentId()) {
754 proc.setParentProcId(proto.getParentId());
755 }
756
757 if (proto.hasOwner()) {
758 proc.setOwner(proto.getOwner());
759 }
760
761 if (proto.hasTimeout()) {
762 proc.setTimeout(proto.getTimeout());
763 }
764
765 if (proto.getStackIdCount() > 0) {
766 proc.setStackIndexes(proto.getStackIdList());
767 }
768
769 if (proto.hasException()) {
770 assert proc.getState() == ProcedureState.FINISHED ||
771 proc.getState() == ProcedureState.ROLLEDBACK :
772 "The procedure must be failed (waiting to rollback) or rolledback";
773 proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
774 }
775
776 if (proto.hasResult()) {
777 proc.setResult(proto.getResult().toByteArray());
778 }
779
780 if (proto.getNonce() != HConstants.NO_NONCE) {
781 NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
782 proc.setNonceKey(nonceKey);
783 }
784
785
786 proc.deserializeStateData(proto.getStateData().newInput());
787
788 return proc;
789 }
790 }