View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.Modifier;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.Map;
29  
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38  import org.apache.hadoop.hbase.util.ByteStringer;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.util.NonceKey;
41  
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.common.base.Preconditions;
44  import com.google.protobuf.ByteString;
45  
46  /**
47   * Base Procedure class responsible to handle the Procedure Metadata
48   * e.g. state, startTime, lastUpdate, stack-indexes, ...
49   *
50   * execute() is called each time the procedure is executed.
51   * it may be called multiple times in case of failure and restart, so the
52   * code must be idempotent.
53   * the return is a set of sub-procedures or null in case the procedure doesn't
54   * have sub-procedures. Once the sub-procedures are successfully completed
55   * the execute() method is called again, you should think at it as a stack:
56   *  -> step 1
57   *  ---> step 2
58   *  -> step 1
59   *
60   * rollback() is called when the procedure or one of the sub-procedures is failed.
61   * the rollback step is supposed to cleanup the resources created during the
62   * execute() step. in case of failure and restart rollback() may be called
63   * multiple times, so the code must be idempotent.
64   */
65  @InterfaceAudience.Private
66  @InterfaceStability.Evolving
67  public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68    // unchanged after initialization
69    private String owner = null;
70    private Long parentProcId = null;
71    private Long procId = null;
72    private long startTime;
73  
74    // runtime state, updated every operation
75    private ProcedureState state = ProcedureState.INITIALIZING;
76    private Integer timeout = null;
77    private int[] stackIndexes = null;
78    private int childrenLatch = 0;
79    private long lastUpdate;
80  
81    private RemoteProcedureException exception = null;
82    private byte[] result = null;
83  
84    private NonceKey nonceKey = null;
85  
86    /**
87     * The main code of the procedure. It must be idempotent since execute()
88     * may be called multiple time in case of machine failure in the middle
89     * of the execution.
90     * @param env the environment passed to the ProcedureExecutor
91     * @return a set of sub-procedures or null if there is nothing else to execute.
92     * @throws ProcedureYieldException the procedure will be added back to the queue and retried later
93     * @throws InterruptedException the procedure will be added back to the queue and retried later
94     */
95    protected abstract Procedure[] execute(TEnvironment env)
96      throws ProcedureYieldException, InterruptedException;
97  
98    /**
99     * The code to undo what done by the execute() code.
100    * It is called when the procedure or one of the sub-procedure failed or an
101    * abort was requested. It should cleanup all the resources created by
102    * the execute() call. The implementation must be idempotent since rollback()
103    * may be called multiple time in case of machine failure in the middle
104    * of the execution.
105    * @param env the environment passed to the ProcedureExecutor
106    * @throws IOException temporary failure, the rollback will retry later
107    * @throws InterruptedException the procedure will be added back to the queue and retried later
108    */
109   protected abstract void rollback(TEnvironment env)
110     throws IOException, InterruptedException;
111 
112   /**
113    * The abort() call is asynchronous and each procedure must decide how to deal
114    * with that, if they want to be abortable. The simplest implementation
115    * is to have an AtomicBoolean set in the abort() method and then the execute()
116    * will check if the abort flag is set or not.
117    * abort() may be called multiple times from the client, so the implementation
118    * must be idempotent.
119    *
120    * NOTE: abort() is not like Thread.interrupt() it is just a notification
121    * that allows the procedure implementor where to abort to avoid leak and
122    * have a better control on what was executed and what not.
123    */
124   protected abstract boolean abort(TEnvironment env);
125 
126   /**
127    * The user-level code of the procedure may have some state to
128    * persist (e.g. input arguments) to be able to resume on failure.
129    * @param stream the stream that will contain the user serialized data
130    */
131   protected abstract void serializeStateData(final OutputStream stream)
132     throws IOException;
133 
134   /**
135    * Called on store load to allow the user to decode the previously serialized
136    * state.
137    * @param stream the stream that contains the user serialized data
138    */
139   protected abstract void deserializeStateData(final InputStream stream)
140     throws IOException;
141 
142   /**
143    * The user should override this method, and try to take a lock if necessary.
144    * A lock can be anything, and it is up to the implementor.
145    * Example: in our Master we can execute request in parallel for different tables
146    *          create t1 and create t2 can be executed at the same time.
147    *          anything else on t1/t2 is queued waiting that specific table create to happen.
148    *
149    * @return true if the lock was acquired and false otherwise
150    */
151   protected boolean acquireLock(final TEnvironment env) {
152     return true;
153   }
154 
155   /**
156    * The user should override this method, and release lock if necessary.
157    */
158   protected void releaseLock(final TEnvironment env) {
159     // no-op
160   }
161 
162   /**
163    * Called when the procedure is loaded for replay.
164    * The procedure implementor may use this method to perform some quick
165    * operation before replay.
166    * e.g. failing the procedure if the state on replay may be unknown.
167    */
168   protected void beforeReplay(final TEnvironment env) {
169     // no-op
170   }
171 
172   /**
173    * Called when the procedure is marked as completed (success or rollback).
174    * The procedure implementor may use this method to cleanup in-memory states.
175    * This operation will not be retried on failure.
176    */
177   protected void completionCleanup(final TEnvironment env) {
178     // no-op
179   }
180 
181   /**
182    * By default, the executor will try ro run procedures start to finish.
183    * Return true to make the executor yield between each execution step to
184    * give other procedures time to run their steps.
185    * @param env the environment passed to the ProcedureExecutor
186    * @return Return true if the executor should yield on completion of an execution step.
187    *         Defaults to return false.
188    */
189   protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
190     return false;
191   }
192 
193   /**
194    * By default, the executor will keep the procedure result around util
195    * the eviction TTL is expired. The client can cut down the waiting time
196    * by requesting that the result is removed from the executor.
197    * In case of system started procedure, we can force the executor to auto-ack.
198    * @param env the environment passed to the ProcedureExecutor
199    * @return true if the executor should wait the client ack for the result.
200    *         Defaults to return true.
201    */
202   protected boolean shouldWaitClientAck(final TEnvironment env) {
203     return true;
204   }
205 
206   @Override
207   public String toString() {
208     StringBuilder sb = new StringBuilder();
209     toStringClassDetails(sb);
210 
211     if (procId != null) {
212       sb.append(" id=");
213       sb.append(getProcId());
214     }
215 
216     if (hasParent()) {
217       sb.append(" parent=");
218       sb.append(getParentProcId());
219     }
220 
221     if (hasOwner()) {
222       sb.append(" owner=");
223       sb.append(getOwner());
224     }
225 
226     sb.append(" state=");
227     toStringState(sb);
228     return sb.toString();
229   }
230 
231   protected String toStringClass() {
232     StringBuilder sb = new StringBuilder();
233     toStringClassDetails(sb);
234 
235     return sb.toString();
236   }
237 
238   /**
239    * Called from {@link #toString()} when interpolating {@link Procedure} state
240    * @param builder Append current {@link ProcedureState}
241    */
242   protected void toStringState(StringBuilder builder) {
243     builder.append(getState());
244   }
245 
246   /**
247    * Extend the toString() information with the procedure details
248    * e.g. className and parameters
249    * @param builder the string builder to use to append the proc specific information
250    */
251   protected void toStringClassDetails(StringBuilder builder) {
252     builder.append(getClass().getName());
253   }
254 
255   /**
256    * @return the serialized result if any, otherwise null
257    */
258   public byte[] getResult() {
259     return result;
260   }
261 
262   /**
263    * The procedure may leave a "result" on completion.
264    * @param result the serialized result that will be passed to the client
265    */
266   protected void setResult(final byte[] result) {
267     this.result = result;
268   }
269 
270   public long getProcId() {
271     return procId;
272   }
273 
274   public boolean hasParent() {
275     return parentProcId != null;
276   }
277 
278   public boolean hasException() {
279     return exception != null;
280   }
281 
282   public boolean hasTimeout() {
283     return timeout != null;
284   }
285 
286   public long getParentProcId() {
287     return parentProcId;
288   }
289 
290   public NonceKey getNonceKey() {
291     return nonceKey;
292   }
293 
294   /**
295    * @return true if the procedure has failed.
296    *         true may mean failed but not yet rolledback or failed and rolledback.
297    */
298   public synchronized boolean isFailed() {
299     return exception != null || state == ProcedureState.ROLLEDBACK;
300   }
301 
302   /**
303    * @return true if the procedure is finished successfully.
304    */
305   public synchronized boolean isSuccess() {
306     return state == ProcedureState.FINISHED && exception == null;
307   }
308 
309   /**
310    * @return true if the procedure is finished. The Procedure may be completed
311    *         successfuly or failed and rolledback.
312    */
313   public synchronized boolean isFinished() {
314     switch (state) {
315       case ROLLEDBACK:
316         return true;
317       case FINISHED:
318         return exception == null;
319       default:
320         break;
321     }
322     return false;
323   }
324 
325   /**
326    * @return true if the procedure is waiting for a child to finish or for an external event.
327    */
328   public synchronized boolean isWaiting() {
329     switch (state) {
330       case WAITING:
331       case WAITING_TIMEOUT:
332         return true;
333       default:
334         break;
335     }
336     return false;
337   }
338 
339   public synchronized RemoteProcedureException getException() {
340     return exception;
341   }
342 
343   public long getStartTime() {
344     return startTime;
345   }
346 
347   public synchronized long getLastUpdate() {
348     return lastUpdate;
349   }
350 
351   public synchronized long elapsedTime() {
352     return lastUpdate - startTime;
353   }
354 
355   /**
356    * @param timeout timeout in msec
357    */
358   protected void setTimeout(final int timeout) {
359     this.timeout = timeout;
360   }
361 
362   /**
363    * @return the timeout in msec
364    */
365   public int getTimeout() {
366     return timeout;
367   }
368 
369   /**
370    * @return the remaining time before the timeout
371    */
372   public long getTimeRemaining() {
373     return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
374   }
375 
376   @VisibleForTesting
377   @InterfaceAudience.Private
378   public void setOwner(final String owner) {
379     this.owner = StringUtils.isEmpty(owner) ? null : owner;
380   }
381 
382   public String getOwner() {
383     return owner;
384   }
385 
386   public boolean hasOwner() {
387     return owner != null;
388   }
389 
390   @VisibleForTesting
391   @InterfaceAudience.Private
392   protected synchronized void setState(final ProcedureState state) {
393     this.state = state;
394     updateTimestamp();
395   }
396 
397   @InterfaceAudience.Private
398   protected synchronized ProcedureState getState() {
399     return state;
400   }
401 
402   protected void setFailure(final String source, final Throwable cause) {
403     setFailure(new RemoteProcedureException(source, cause));
404   }
405 
406   protected synchronized void setFailure(final RemoteProcedureException exception) {
407     this.exception = exception;
408     if (!isFinished()) {
409       setState(ProcedureState.FINISHED);
410     }
411   }
412 
413   protected void setAbortFailure(final String source, final String msg) {
414     setFailure(source, new ProcedureAbortedException(msg));
415   }
416 
417   @InterfaceAudience.Private
418   protected synchronized boolean setTimeoutFailure() {
419     if (state == ProcedureState.WAITING_TIMEOUT) {
420       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
421       setFailure("ProcedureExecutor", new TimeoutIOException(
422         "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
423       return true;
424     }
425     return false;
426   }
427 
428   /**
429    * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
430    */
431   @VisibleForTesting
432   @InterfaceAudience.Private
433   protected void setProcId(final long procId) {
434     this.procId = procId;
435     this.startTime = EnvironmentEdgeManager.currentTime();
436     setState(ProcedureState.RUNNABLE);
437   }
438 
439   /**
440    * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
441    */
442   @InterfaceAudience.Private
443   protected void setParentProcId(final long parentProcId) {
444     this.parentProcId = parentProcId;
445   }
446 
447   /**
448    * Called by the ProcedureExecutor to set the value to the newly created procedure.
449    */
450   @VisibleForTesting
451   @InterfaceAudience.Private
452   protected void setNonceKey(final NonceKey nonceKey) {
453     this.nonceKey = nonceKey;
454   }
455 
456   /**
457    * Internal method called by the ProcedureExecutor that starts the
458    * user-level code execute().
459    */
460   @InterfaceAudience.Private
461   protected Procedure[] doExecute(final TEnvironment env)
462       throws ProcedureYieldException, InterruptedException {
463     try {
464       updateTimestamp();
465       return execute(env);
466     } finally {
467       updateTimestamp();
468     }
469   }
470 
471   /**
472    * Internal method called by the ProcedureExecutor that starts the
473    * user-level code rollback().
474    */
475   @InterfaceAudience.Private
476   protected void doRollback(final TEnvironment env)
477       throws IOException, InterruptedException {
478     try {
479       updateTimestamp();
480       rollback(env);
481     } finally {
482       updateTimestamp();
483     }
484   }
485 
486   /**
487    * Called on store load to initialize the Procedure internals after
488    * the creation/deserialization.
489    */
490   @InterfaceAudience.Private
491   protected void setStartTime(final long startTime) {
492     this.startTime = startTime;
493   }
494 
495   /**
496    * Called on store load to initialize the Procedure internals after
497    * the creation/deserialization.
498    */
499   private synchronized void setLastUpdate(final long lastUpdate) {
500     this.lastUpdate = lastUpdate;
501   }
502 
503   protected synchronized void updateTimestamp() {
504     this.lastUpdate = EnvironmentEdgeManager.currentTime();
505   }
506 
507   /**
508    * Called by the ProcedureExecutor on procedure-load to restore the latch state
509    */
510   @InterfaceAudience.Private
511   protected synchronized void setChildrenLatch(final int numChildren) {
512     this.childrenLatch = numChildren;
513   }
514 
515   /**
516    * Called by the ProcedureExecutor on procedure-load to restore the latch state
517    */
518   @InterfaceAudience.Private
519   protected synchronized void incChildrenLatch() {
520     // TODO: can this be inferred from the stack? I think so...
521     this.childrenLatch++;
522   }
523 
524   /**
525    * Called by the ProcedureExecutor to notify that one of the sub-procedures
526    * has completed.
527    */
528   @InterfaceAudience.Private
529   protected synchronized boolean childrenCountDown() {
530     assert childrenLatch > 0;
531     return --childrenLatch == 0;
532   }
533 
534   /**
535    * Called by the RootProcedureState on procedure execution.
536    * Each procedure store its stack-index positions.
537    */
538   @InterfaceAudience.Private
539   protected synchronized void addStackIndex(final int index) {
540     if (stackIndexes == null) {
541       stackIndexes = new int[] { index };
542     } else {
543       int count = stackIndexes.length;
544       stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
545       stackIndexes[count] = index;
546     }
547   }
548 
549   @InterfaceAudience.Private
550   protected synchronized boolean removeStackIndex() {
551     if (stackIndexes.length > 1) {
552       stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
553       return false;
554     } else {
555       stackIndexes = null;
556       return true;
557     }
558   }
559 
560   /**
561    * Called on store load to initialize the Procedure internals after
562    * the creation/deserialization.
563    */
564   @InterfaceAudience.Private
565   protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
566     this.stackIndexes = new int[stackIndexes.size()];
567     for (int i = 0; i < this.stackIndexes.length; ++i) {
568       this.stackIndexes[i] = stackIndexes.get(i);
569     }
570   }
571 
572   @InterfaceAudience.Private
573   protected synchronized boolean wasExecuted() {
574     return stackIndexes != null;
575   }
576 
577   @InterfaceAudience.Private
578   protected synchronized int[] getStackIndexes() {
579     return stackIndexes;
580   }
581 
582   @Override
583   public int compareTo(final Procedure other) {
584     long diff = getProcId() - other.getProcId();
585     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
586   }
587 
588   /**
589    * Get an hashcode for the specified Procedure ID
590    * @return the hashcode for the specified procId
591    */
592   public static long getProcIdHashCode(final long procId) {
593     long h = procId;
594     h ^= h >> 16;
595     h *= 0x85ebca6b;
596     h ^= h >> 13;
597     h *= 0xc2b2ae35;
598     h ^= h >> 16;
599     return h;
600   }
601 
602   /*
603    * Helper to lookup the root Procedure ID given a specified procedure.
604    */
605   @InterfaceAudience.Private
606   protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
607     while (proc.hasParent()) {
608       proc = procedures.get(proc.getParentProcId());
609       if (proc == null) return null;
610     }
611     return proc.getProcId();
612   }
613 
614   protected static Procedure newInstance(final String className) throws IOException {
615     try {
616       Class<?> clazz = Class.forName(className);
617       if (!Modifier.isPublic(clazz.getModifiers())) {
618         throw new Exception("the " + clazz + " class is not public");
619       }
620 
621       Constructor<?> ctor = clazz.getConstructor();
622       assert ctor != null : "no constructor found";
623       if (!Modifier.isPublic(ctor.getModifiers())) {
624         throw new Exception("the " + clazz + " constructor is not public");
625       }
626       return (Procedure)ctor.newInstance();
627     } catch (Exception e) {
628       throw new IOException("The procedure class " + className +
629           " must be accessible and have an empty constructor", e);
630     }
631   }
632 
633   protected static void validateClass(final Procedure proc) throws IOException {
634     try {
635       Class<?> clazz = proc.getClass();
636       if (!Modifier.isPublic(clazz.getModifiers())) {
637         throw new Exception("the " + clazz + " class is not public");
638       }
639 
640       Constructor<?> ctor = clazz.getConstructor();
641       assert ctor != null;
642       if (!Modifier.isPublic(ctor.getModifiers())) {
643         throw new Exception("the " + clazz + " constructor is not public");
644       }
645     } catch (Exception e) {
646       throw new IOException("The procedure class " + proc.getClass().getName() +
647           " must be accessible and have an empty constructor", e);
648     }
649   }
650 
651   /**
652    * Helper to create the ProcedureInfo from Procedure.
653    */
654   @InterfaceAudience.Private
655   public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
656     RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
657     return new ProcedureInfo(
658       proc.getProcId(),
659       proc.toStringClass(),
660       proc.getOwner(),
661       proc.getState(),
662       proc.hasParent() ? proc.getParentProcId() : -1,
663       nonceKey,
664       exception != null ?
665           RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
666       proc.getLastUpdate(),
667       proc.getStartTime(),
668       proc.getResult());
669   }
670 
671   /**
672    * Helper to convert the procedure to protobuf.
673    * Used by ProcedureStore implementations.
674    */
675   @InterfaceAudience.Private
676   public static ProcedureProtos.Procedure convert(final Procedure proc)
677       throws IOException {
678     Preconditions.checkArgument(proc != null);
679     validateClass(proc);
680 
681     ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
682       .setClassName(proc.getClass().getName())
683       .setProcId(proc.getProcId())
684       .setState(proc.getState())
685       .setStartTime(proc.getStartTime())
686       .setLastUpdate(proc.getLastUpdate());
687 
688     if (proc.hasParent()) {
689       builder.setParentId(proc.getParentProcId());
690     }
691 
692     if (proc.hasTimeout()) {
693       builder.setTimeout(proc.getTimeout());
694     }
695 
696     if (proc.hasOwner()) {
697       builder.setOwner(proc.getOwner());
698     }
699 
700     int[] stackIds = proc.getStackIndexes();
701     if (stackIds != null) {
702       for (int i = 0; i < stackIds.length; ++i) {
703         builder.addStackId(stackIds[i]);
704       }
705     }
706 
707     if (proc.hasException()) {
708       RemoteProcedureException exception = proc.getException();
709       builder.setException(
710         RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
711     }
712 
713     byte[] result = proc.getResult();
714     if (result != null) {
715       builder.setResult(ByteStringer.wrap(result));
716     }
717 
718     ByteString.Output stateStream = ByteString.newOutput();
719     proc.serializeStateData(stateStream);
720     if (stateStream.size() > 0) {
721       builder.setStateData(stateStream.toByteString());
722     }
723 
724     if (proc.getNonceKey() != null) {
725       builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
726       builder.setNonce(proc.getNonceKey().getNonce());
727     }
728 
729     return builder.build();
730   }
731 
732   /**
733    * Helper to convert the protobuf procedure.
734    * Used by ProcedureStore implementations.
735    *
736    * TODO: OPTIMIZATION: some of the field never change during the execution
737    *                     (e.g. className, procId, parentId, ...).
738    *                     We can split in 'data' and 'state', and the store
739    *                     may take advantage of it by storing the data only on insert().
740    */
741   @InterfaceAudience.Private
742   public static Procedure convert(final ProcedureProtos.Procedure proto)
743       throws IOException {
744     // Procedure from class name
745     Procedure proc = Procedure.newInstance(proto.getClassName());
746 
747     // set fields
748     proc.setProcId(proto.getProcId());
749     proc.setState(proto.getState());
750     proc.setStartTime(proto.getStartTime());
751     proc.setLastUpdate(proto.getLastUpdate());
752 
753     if (proto.hasParentId()) {
754       proc.setParentProcId(proto.getParentId());
755     }
756 
757     if (proto.hasOwner()) {
758       proc.setOwner(proto.getOwner());
759     }
760 
761     if (proto.hasTimeout()) {
762       proc.setTimeout(proto.getTimeout());
763     }
764 
765     if (proto.getStackIdCount() > 0) {
766       proc.setStackIndexes(proto.getStackIdList());
767     }
768 
769     if (proto.hasException()) {
770       assert proc.getState() == ProcedureState.FINISHED ||
771              proc.getState() == ProcedureState.ROLLEDBACK :
772              "The procedure must be failed (waiting to rollback) or rolledback";
773       proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
774     }
775 
776     if (proto.hasResult()) {
777       proc.setResult(proto.getResult().toByteArray());
778     }
779 
780     if (proto.getNonce() != HConstants.NO_NONCE) {
781       NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
782       proc.setNonceKey(nonceKey);
783     }
784 
785     // we want to call deserialize even when the stream is empty, mainly for testing.
786     proc.deserializeStateData(proto.getStateData().newInput());
787 
788     return proc;
789   }
790 }